Azure Data Lake Storage Gen2 Sink Connector for Confluent Platform

Note

If you are using Confluent Cloud, see Azure Data Lake Storage Gen2 Sink Connector for Confluent Cloud for the cloud Quick Start.

You can use the Azure Data Lake Storage Gen2 connector, currently available as a sink connector, to export data from Apache Kafka® topics to Azure Data Lake Storage Gen2 files in Avro, JSON or ByteArray formats. Depending on your configuration, the Azure Data Lake Storage Gen2 connector can export data by guaranteeing exactly-once delivery semantics to consumers of the Azure Data Lake Storage Gen2 files it produces.

The Azure Data Lake Storage Gen2 sink connector periodically polls data from Kafka and, in turn, uploads it to Azure Data Lake Storage Gen2. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an Azure Data Lake Storage Gen2 file. The key name encodes the topic, the Kafka partition, and the start offset of this data chunk. If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used. The size of each data chunk is determined by the number of records written to Azure Data Lake Storage Gen2 and by schema compatibility.

Note

This connector is not compatible with Azure Data Lake Storage Gen1.

Features

The Microsoft Azure Data Lake Storage Gen2 connector offers a variety of features:

  • Exactly Once Delivery: Records that are exported using a deterministic partitioner are delivered with exactly-once semantics.
  • Pluggable Data Format with or without Schema: Out of the box, the connector supports writing data to Azure Data Lake Storage Gen2 in Avro and JSON format. Besides records with schema, the connector supports exporting plain JSON and as byte array records without schema in text files, one record per-line. In general, the connector may accept any format that provides an implementation of the Format interface.
  • Schema Evolution: When schemas are used, the connector supports schema evolution based on schema compatibility modes. The available modes are: NONE, BACKWARD, FORWARD and FULL and a selection can be made by setting the property schema.compatibility in the connector’s configuration. When the connector observes a schema change, it decides whether to roll the file or project the record to the proper schema according to the schema.compatibility configuration in use.
  • Pluggable Partitioner: The connector comes out of the box with partitioners that support default partitioning based on Apache Kafka® partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time-based partitioning by extending the TimeBasedPartitioner class.

Note

This connector is not compatible with SMTs that change the name of the topic.

Prerequisites

The following are required to run the Kafka Connect Azure Data Lake Storage Gen2 Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8
  • Azure CLI

Note

See Using the Azure CLI with Azure Storage for more information about using the Azure CLI with Azure storage.

Install the Azure Data Lake Storage Gen2 Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen2-storage:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen2-storage:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Azure Data Lake Storage Gen2 Sink Connector Configuration Properties.

Quick Start

In this quick start, the Azure Data Lake Storage Gen2 connector is used to export data produced by the Avro console producer to Azure Data Lake Storage Gen2.

Note

Before you begin: create an Azure Data Lake Storage Gen2 account and grant write access to the user completing these procedures. See Get started with Azure Data Lake Storage Gen2 using the Azure portal for additional information. Also see Service-to-service authentication with Azure Data Lake Storage using Azure Active Directory for information on setting up the account needed for the Azure Data Lake Storage Gen2 connector.

Install the connector through the Confluent Hub Client.

# run from your CP installation directory
confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen2-storage:latest

Tip

By default, it will install the plugin into share/confluent-hub-components and add the directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. Also see Azure Data Lake Gen2 CLI for setting up and using the Azure CLI.

Start the services using the Confluent CLI.

confluent local start

Every service starts in order, printing a message with its status.

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Note

Make sure the Azure Data Lake Storage Gen2 connector has write access to the Azure Data Lake Storage Gen2 account shown in azure.datalake.gen2.account.name and can deploy credentials successfully.

To import a few records with a simple schema in Kafka, start the Avro console producer as follows:

  ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic datalake_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Then, in the console producer, enter the following:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
{"f1": "value4"}
{"f1": "value5"}
{"f1": "value6"}
{"f1": "value7"}
{"f1": "value8"}
{"f1": "value9"}

The nine records entered are published to the Kafka topic datalake_topic in Avro format.

Create a datalake.properties file with the following contents:

name=adls-gen2-sink
connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector
tasks.max=1
topics=datalake_topic
flush.size=3
azure.datalake.gen2.client.id=<your client id>
azure.datalake.gen2.client.key=<your client key>
azure.datalake.gen2.account.name=<your account name>
azure.datalake.gen2.token.endpoint=https://login.microsoftonline.com/<tenant-id>/oauth2/token
format.class=io.confluent.connect.azure.storage.format.avro.AvroFormat
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

Before starting the connector, make sure that the configurations in datalake.properties are properly set to your configurations of Azure Data Lake Storage Gen2. For this example, make sure that azure.datalake.gen2.account.name points to your Data Lake store, azure.datalake.gen2.client.id is set to your user id, and azure.datalake.gen2.client.key is set to your user’s secret key. The user ID or client ID should have permission to write to the Azure Data Lake Storage Gen2 Account. Finally, set azure.datalake.gen2.token.endpoint to the Oauth 2 endpoint as described here, and use the v1 token endpoint. Then start the Azure Blob Storage connector by loading its configuration with the following command.

Caution

You must include a double dash (--) between the connector name and your flag. For more information, see this post.

confluent local load datalake-sink -- -d datalake.properties
{
 "name": "adls-gen2-sink",
 "config": {
     "name":"adls-gen2-sink",
     "connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
     "tasks.max":"1",
     "topics":"datalake_topic",
     "flush.size":"3",
     "azure.datalake.gen2.client.id":"<your client id>",
     "azure.datalake.gen2.client.key":"<your client key>",
     "azure.datalake.gen2.account.name":"<your account name>",
     "azure.datalake.gen2.token.endpoint":"https://login.microsoftonline.com/<tenant-id>/oauth2/token",
     "format.class":"io.confluent.connect.azure.storage.format.avro.AvroFormat",
     "confluent.topic.bootstrap.servers":"localhost:9092",
     "confluent.topic.replication.factor":"1"
 },
  "tasks": []
}

Check that the connector started successfully. Review the Connect worker’s log by entering the following:

confluent local log connect

Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from Kafka to Azure Data Lake Storage Gen2.

Once the connector has ingested some records, check that the data is available in Azure Data Lake Storage Gen2. Use the following Azure CLI command:

az storage blob list --account-name <Your Storage Account Name> --container-name topics --account-key <Your Account Key>

In the JSON response, you should see three objects with keys.

datalake_topic/partition=0/datalake_topic+0+0000000000.avro
datalake_topic/partition=0/datalake_topic+0+0000000003.avro
datalake_topic/partition=0/datalake_topic+0+0000000006.avro

Each file is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>.

To verify the contents, copy each file from Azure Data Lake Storage Gen2 to your local filesystem. Use the following Azure CLI command changing the destination to what makes sense for you:

az storage blob download  --container-name topics --name datalake_topic/partition=0/datalake_topic+0+0000000000.avro --file datalake_topic+0+0000000000.avro --account-name <Your Storage Account Name>  --account-key <Your Account Key>

Use avro-tools-1.9.0.jar (available in Apache mirrors) to print the records.

java -jar avro-tools-1.8.2.jar tojson datalake_topic+0+0000000000.avro

For the file above, you should see the following output:

{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

The rest of the records are contained in the other two files.

Finally, stop the Connect worker and all other Confluent services by running:

confluent local stop

Your output should resemble:

Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

You can stop all services and remove any data generated during this quick start by entering the following command:

confluent local destroy

Your output should resemble:

Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE

Exactly-once delivery

The Azure Data Lake Storage Gen2 connector is able to provide exactly-once semantics to consumers of the objects it exports to Azure Data Lake Storage Gen2, if the connector is supplied with a deterministic partitioner.

Currently, out of the available partitioners, the default and field partitioners are always deterministic. TimeBasedPartitioner can be deterministic with some configurations as discussed below. This implies that when any of these partitioners is used, file splitting always happens at the same offsets for a given set of Kafka records. These partitioners take into account flush.size and schema.compatibility to decide when to roll and save a new file to Azure Data Lake Storage Gen2. The connector always delivers files in Azure Data Lake Storage Gen2 that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file will be still in the temp/ folder . If, on the other hand, a failure occurs after the upload has completed, but before the corresponding offset is committed to Kafka by the connector, then a re-upload will take place. However, this type of re-upload is transparent to the user of the Azure Data Lake Storage Gen2 folder, who at any time will have access to the same records made eventually available by successful uploads to Azure Data Lake Storage Gen2.

To guarantee exactly-once semantics with the TimeBasedPartitioner, the connector must be configured to use a deterministic implementation of TimestampExtractor and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (timestamp.extractor=Record) or record fields (timestamp.extractor=RecordField). The deterministic rotation strategy configuration is rotate.interval.ms (setting rotate.schedule.interval.ms is nondeterministic and will invalidate exactly-once guarantees).

../../_images/connect-s3-eos2.png

Schema Evolution

The Azure Data Lake Storage Gen2 connector supports schema evolution and reacts to schema changes of data according to the schema.compatibility configuration. This section explains how the connector reacts to schema evolution under different values of schema.compatibility. The schema.compatibility can be set to NONE, BACKWARD, FORWARD and FULL, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively.

  • NO Compatibility: By default, the schema.compatibility is set to NONE. In this case, the connector ensures that each file written to Azure Data Lake Storage Gen2 has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.

  • BACKWARD Compatibility: If a schema is evolved in a backward compatible way, the connector can always use the latest schema to query all the data uniformly. For example, removing fields is a backward compatible change to a schema, since when the connector encounters records written with the old schema that contain these fields, the connector can just ignore them. Adding a field with a default value is also backward compatible.

    If BACKWARD is specified in the schema.compatibility, the connector keeps track of the latest schema used in writing data to Azure Data Lake Storage Gen2. If a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time that use an earlier schema, the connector projects the data record to the latest schema before writing to the same set of files in Azure Data Lake Storage Gen2.

  • FORWARD Compatibility: If a schema is evolved in a forward compatible way, the connector can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.

    If FORWARD is specified in the schema.compatibility, the connector projects the data to the oldest schema before writing to the same set of files in Azure Data Lake Storage Gen2.

  • Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.

    If FULL is specified in the schema.compatibility, the connector performs the same action as BACKWARD.

Schema evolution in the Azure Data Lake Storage Gen2 connector works the same way as in the Kafka Connect S3 connector.

Write JSON message values into Azure Data Lake Storage Gen2

The example settings file is shown below:

name=adls-gen2-sink
connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector
tasks.max=1
topics=datalake_topic
flush.size=100

# Required configuration
azure.datalake.gen2.client.id=<your client id>
azure.datalake.gen2.client.key=<your client key>

# The following define the information used to validate the license stored in Kafka
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092

The first few settings are common to most connectors. topics specifies the topics to export data from, in this case datalake_topic. The property flush.size specifies the number of records per partition the connector needs to write to before completing a multiblock upload to Azure Data Lake Storage Gen2.

The azure.datalake.gen2.client.id and azure.datalake.gen2.client.key are your required Azure credentials. This is a licensed Confluent connector. Enter the following for testing purposes. For more on the this look at the Azure Data Lake Storage Gen2 Licensing section.

azure.datalake.gen2.account.name=<your account name>
azure.datalake.gen2.token.endpoint=<your azure oauth2 token endpoint>

The next settings are specific to Azure Data Lake Storage Gen2. A mandatory setting is the name of your Azure Data Lake Gen2 store/account azure.datalake.gen2.account.name to host the exported Kafka records. Another mandatory configuration setting is azure.datalake.gen2.token.endpoint. The connector authenticates access to your data lake using this URL.

format.class=io.confluent.connect.azure.storage.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

These class settings are required to specify the output file format, which is currently io.confluent.connect.azure.storage.format.avro.AvroFormat, io.confluent.connect.azure.storage.format.json.JsonFormat or io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat, and the partitioner class

schema.compatibility=NONE

Finally, schema evolution is disabled in this example by setting schema.compatibility to NONE, as explained above.

For detailed descriptions for all the available configuration options of the Azure Data Lake Storage Gen2 connector go to Azure Data Lake Storage Gen2 Sink Connector Configuration Properties.

Write raw message values into Azure Data Lake Storage Gen2

It is possible to use the Azure Data Lake Storage Gen2 connector to write out the unmodified original message values into newline-separated files in Azure Data Lake Storage Gen2. To accomplish this configure Kafka Connect so it does not deserialize any of the messages, and configure the Azure Data Lake Storage Gen2 connector to store the message values in a binary format in Azure Data Lake Storage Gen2.

The first part of the Azure Data Lake Storage Gen2 connector configuration is similar to other examples.

name=datalake-raw-sink
connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector
tasks.max=1
topics=datalake_topic
flush.size=3

The topics setting specifies the topics you want to export data from, which is datalake_topic in the example. The property flush.size specifies the number of records per partition the connector needs to write before completing an upload to Azure Data Lake Storage Gen2.

Next, configure account name and compression type.

azure.datalake.gen2.account.name=<your account name>
azure.datalake.gen2.token.endpoint=<your azure oauth2 token endpoint>
azure.datalake.gen2.client.id=<your client id>
azure.datalake.gen2.client.key=<your client key>
az.compression.type=gzip

The next settings are specific to Azure Data Lake Storage Gen2. A mandatory setting is the account name of your Gen2 Azure Data Lake, azure.datalake.account.name which will host the exported Kafka records. Another mandatory configuration setting is azure.datalake.token.endpoint. The connector authenticates access to your data lake using this URL. The azure.datalake.client.id and azure.datalake.client.key are your required Azure client credentials.

The az.compression.type specifies that the Azure Data Lake Storage Gen2 connector should compress all Azure Data Lake Storage Gen2 files using GZIP compression, adding the .gz extension to any files (see below).

This example configuration is typical of most Azure Data Lake Storage Gen2 connectors. Now, configure the connector to read the raw message values and write them in binary format:

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
schema.compatibility=NONE

The value.converter setting overrides the connector default in the Connect worker configuration. ByteArrayConverter is used to instruct Connect to skip deserializing the message values and provide the message values in their raw binary form. The format.class setting is used to instruct the Azure Data Lake Storage Gen2 connector to write these binary message values as-is into Azure Data Lake Storage Gen2 files. By default the messages written to the same Azure Data Lake Storage Gen2 file are separated by a newline character sequence, but you can control this with the format.bytearray.separator setting. You may want to consider setting this if your messages might contain newlines. Also, by default the files written to Azure Data Lake Storage Gen2 have an extension of .bin (before compression, if enabled), or you can use the format.bytearray.extension setting to change the pre-compression filename extension.

Next, you need to decide how you want to partition the consumed messages in Azure Data Lake Storage Gen2 files. You have a few options, including the default partitioner that preserves the same partitions as in Kafka:

partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

Or, you could partition using the timestamp of the Kafka messages.

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Record

Or, you can use the timestamp that the Azure Data Lake Storage Gen2 connector processes each message.

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Wallclock

Custom partitioners are always an option, too. Just be aware that since the record value is an opaque binary value, Connect cannot extract timestamps from fields using the RecordField option.

The Azure Data Lake Storage Gen2 connector configuration outlined above results in newline-delimited gzipped objects in Azure Data Lake Storage Gen2 with .bin.gz.