Generalized Azure Blob Storage Source Connector for Confluent Platform

Note

If you are using Confluent Cloud, see the cloud Quick Start in Azure Blob Storage Source Connector for Confluent Cloud.

The Generalized Kafka Connect Azure Blob Storage Source connector can read data from any type of file naming convention listed under a Azure Blob Storage , and the filenames don’t have to be in a specific format. As long as the files are in any of the supported formats (for example, JSON, Avro and Byte Array) the connector will be able to read them.

Features

The Generalized Azure Blob Storage Source connector includes the following features:

At least once delivery

In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.

Multiple tasks

The Generalized Azure Blob Storage Sourceconnector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. Multiple tasks may improve performance when moving a large amount of data.

Pluggable data format with or without schema

Out of the box, the connector supports reading data from Azure Blob Storage in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.

Limitations

The Generalized Azure Blob Storage Source connector has the following limitations:

  • The connector won’t reload data during the following scenarios:
    • Renaming a file which the connector has already read.
    • Uploading a newer version of a file with a new record.

You should also be aware of the following connector actions:

  • The connector ignores any Azure Blob Source object with a name that does not start with the configured topics.dir directory. This name is topics/ by default.
  • The connector uses the connector name to store offsets on how much of the container it has processed. Deleting a connector and using the same name will not cause the connector to reprocess from the beginning but will save the progress of the original connector unless the corresponding entry in the offset topic is cleared.
  • For a new container, you need to create a new connector with an unused name. If you reconfigure an existing connector to source from the new container, or create a connector with a name that is used for another connector, the connector will not source from the beginning of data stored in the container. This is because the connector will maintain offsets tied to the connector name.

Prerequisites

The following are required to run the Kafka Connect Generalized Azure Blob Storage Source connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8

Install the Generalized Azure Blob Storage Source Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Important

You must install the connector on every machine where Connect will run.

  • An installation of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-azure-blob-storage-source:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-azure-blob-storage-source:1.0.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent license properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Azure Blob Storage Source Connector Configuration Properties.

Quick Start

In the following scenario, the Azure Blob Storage Source connector reads all data listed under a specific container and then loads them into a Kafka topic. You can use any file naming convention when writing data to the Azure Blob Storage container.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

  1. Upload the following data under a folder named quickstart within the targeted Azure Blob Storage container. In this example, JSON format is used, which supports the following: line-delimited JSON, concatenated JSON, and a JSON array of records.

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    
  2. Install the connector using the Confluent Hub Client by running the following command from your Confluent Platform installation directory:

    confluent-hub install confluentinc/kafka-connect-azure-blob-storage-source:latest
    

    Tip

    By default, the plugin is installed into share/confluent-hub-components and the directory is added to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect.

  3. Create a quickstart-azureblobstoragesource.properties file with the following contents:

    name=quickstart-azure-blob-storage-source
    connector.class = io.confluent.connect.azure.blob.storage.AzureBlobStorageSourceConnector
    tasks.max=1
    value.converter=org.apache.kafka.connect.json.JsonConverter
    mode=GENERIC
    topics.dir=quickstart
    format.clas =io.confluent.connect.azure.blob.storage.format.json.JsonFormat
    topic.regex.list=quick-start-topic:.*
    azblob.account.name=my-azure-blob-storage-account
    azblob.account.key=my-azure-blob-storage-key
    azblob.container.name=my-azure-blob-storage-container
    value.converter.schemas.enable=false
    
  4. Load the Generalized Azure Blob Storage Source connector.

    Tip

    The syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services connect connector load quickstart-azure-blob-storage-source --config quickstart-azure-blob-storage-source-generalized.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

  5. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status quickstart-azure-blob-storage-source
    
  6. Confirm that the messages are being sent to Kafka.

    kafka-console-consumer \
        --bootstrap-server localhost:9092 \
        --topic quick-start-topic \
        --from-beginning
    
  7. The response should be 9 records as follows.

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

Generalized Azure Blob Storage Source Connector Data Formats

The Generalized Azure Blob Storage Source connector can read different file formats in Azure Blob and serialize them into Kafka records. This is controlled by the connector’s format.class configuration property, which has several options:

  • Avro: Use format.class=io.confluent.connect.cloud.storage.source.format.CloudStorageAvroFormat to source Avro container files.
  • JSON: Use format.class=io.confluent.connect.cloud.storage.source.format.CloudStorageJsonFormat to source JSON files. Supported JSON formats are line-delimited JSON, record separator-limited JSON, and concatenated JSON.
  • Raw Bytes: Use format.class=io.confluent.connect.cloud.storage.source.format.CloudStorageByteArrayFormat to parse the Azure Blob Storage object content as raw bytes. The default line separator will be the newline character, but this can be customized with the format.bytearray.separator configuration property.

Troubleshooting Connector and Task Failures

Stack trace

You can use the Connect Kafka Connect REST Interface to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.

Stack Trace message: No new files ready after scan task…

If this message is displayed, complete the following steps:

  1. Review your topics.dir configuration property to ensure you have configured the right folder under the Azure Blob Storage container. If you do not set this parameter, the connector expects the data to be under the default folder which is topics.
  2. Review the topic.regex configuration property to ensure your expression matches your data in the Azure Blob Storage container to the Kafka topic.