Azure Data Lake Storage Gen2 Sink Connector for Confluent Platform
You can use the Azure Data Lake Storage Gen2 connector, currently available as a
sink connector, to export data from Apache Kafka® topics to Azure Data Lake Storage
Gen2 files in Avro
, JSON
, Parquet
or ByteArray
formats.
Depending on your configuration, the Azure Data Lake Storage Gen2 connector can
export data by guaranteeing exactly-once delivery semantics to consumers of the
Azure Data Lake Storage Gen2 files it produces.
The Azure Data Lake Storage Gen2 sink connector periodically polls data from
Kafka and, in turn, uploads it to Azure Data Lake Storage Gen2. A partitioner is
used to split the data of every Kafka partition into chunks. Each chunk of data
is represented as an Azure Data Lake Storage Gen2 file. The key name encodes the
topic, the Kafka partition, and the start offset of this data chunk. If no
partitioner is specified in the configuration, the default partitioner which
preserves Kafka partitioning is used. The size of each data chunk is determined
by the number of records written to Azure Data Lake Storage Gen2 and by schema
compatibility.
Note
This connector is not compatible with Azure Data Lake Storage Gen1.
Features
The Microsoft Azure Data Lake Storage Gen2 Sink Connector includes the following
features:
Exactly once delivery
Records that are exported using a deterministic partitioner are delivered with
exactly-once semantics. See the Exactly-once Semantics section for more
details.
Schema evolution
When schemas are used, the connector supports schema evolution based on schema
compatibility modes. The available modes are: NONE
, BACKWARD
,
FORWARD
and FULL
and a selection can be made by setting the property
schema.compatibility
in the connector’s configuration. When the connector
observes a schema change, it decides whether to roll the file or project the
record to the proper schema according to the schema.compatibility
configuration in use. See the Reaction to Schema Evolution section for more
details.
Pluggable partitioner
The connector comes out of the box with partitioners that support default
partitioning based on Apache Kafka® partitions, field partitioning, and time-based
partitioning in days or hours. You may implement your own partitioners by
extending the Partitioner
class. Additionally, you can customize time-based
partitioning by extending the TimeBasedPartitioner
class.
Note
This connector is not compatible with SMTs that change the name of the
topic.
Prerequisites
The following are required to run the Kafka Connect Azure Data Lake Storage
Gen2 Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
- Azure CLI
Install the Azure Data Lake Storage Gen2 Connector
You can install this connector by using the Confluent Hub client installation
instructions or by manually
downloading the ZIP file.
Prerequisites
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen2-storage:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen2-storage:1.0.0-preview
Quick Start
In this quick start, the Azure Data Lake Storage Gen2 connector is used to export
data produced by the Avro console producer to Azure Data Lake Storage Gen2.
Install the connector through the Confluent Hub Client.
# run from your CP installation directory
confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen2-storage:latest
Tip
By default, it will install the plugin into
share/confluent-hub-components
and add the directory to the plugin path.
If this is the first connector you have installed, you may need to restart
the connect server for the plugin path change to take effect. Also see Azure
Data Lake Gen2 CLI
for setting up and using the Azure CLI.
Start the services using the Confluent CLI.
confluent local services start
Every service starts in order, printing a message with its status.
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
Note
Ensure the Azure Data Lake Storage Gen2 connector has write access to the
Azure Data Lake Storage Gen2 account shown in
azure.datalake.gen2.account.name
and can deploy credentials successfully.
To import a few records with a simple schema in Kafka, start the Avro console
producer as follows:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic datalake_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then, in the console producer, enter the following:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
{"f1": "value4"}
{"f1": "value5"}
{"f1": "value6"}
{"f1": "value7"}
{"f1": "value8"}
{"f1": "value9"}
The nine records entered are published to the Kafka topic datalake_topic
in
Avro format.
Create a datalake.properties
file with the following contents:
name=adls-gen2-sink
connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector
tasks.max=1
topics=datalake_topic
flush.size=3
azure.datalake.gen2.client.id=<your client id>
azure.datalake.gen2.client.key=<your client key>
azure.datalake.gen2.account.name=<your account name>
azure.datalake.gen2.token.endpoint=https://login.microsoftonline.com/<tenant-id>/oauth2/token
format.class=io.confluent.connect.azure.storage.format.avro.AvroFormat
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Before starting the connector, make sure that the configurations in
datalake.properties
are properly set to your configurations of Azure Data
Lake Storage Gen2. For this example, make sure that
azure.datalake.gen2.account.name
points to your Data Lake store,
azure.datalake.gen2.client.id
is set to your user id, and
azure.datalake.gen2.client.key
is set to your user’s secret key. The user ID
or client ID should have permission to write to the Azure Data Lake Storage Gen2
Account. Finally, set azure.datalake.gen2.token.endpoint
to the Oauth 2
endpoint as described here,
and use the v1 token endpoint.
Then start the Azure Blob Storage connector by loading its configuration with the
following command.
Caution
You must include a double dash (--
) between the connector name and your flag. For more information,
see this post.
confluent local services connect connector load datalake-sink --config datalake.properties
{
"name": "adls-gen2-sink",
"config": {
"name":"adls-gen2-sink",
"connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"tasks.max":"1",
"topics":"datalake_topic",
"flush.size":"3",
"azure.datalake.gen2.client.id":"<your client id>",
"azure.datalake.gen2.client.key":"<your client key>",
"azure.datalake.gen2.account.name":"<your account name>",
"azure.datalake.gen2.token.endpoint":"https://login.microsoftonline.com/<tenant-id>/oauth2/token",
"format.class":"io.confluent.connect.azure.storage.format.avro.AvroFormat",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
},
"tasks": []
}
Check that the connector started successfully. Review the Connect worker’s
log by entering the following:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few
messages, and then uploads data from Kafka to Azure Data Lake Storage Gen2.
Once the connector has ingested some records, check that the data is available
in Azure Data Lake Storage Gen2. Use the following Azure CLI command:
az storage blob list --account-name <Your Storage Account Name> --container-name topics --account-key <Your Account Key>
In the JSON response, you should see three objects with keys.
datalake_topic/partition=0/datalake_topic+0+0000000000.avro
datalake_topic/partition=0/datalake_topic+0+0000000003.avro
datalake_topic/partition=0/datalake_topic+0+0000000006.avro
Each file is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>
.
To verify the contents, copy each file from Azure Data Lake Storage Gen2 to your
local filesystem. Use the following Azure CLI command changing the destination to
what makes sense for you:
az storage blob download --container-name topics --name datalake_topic/partition=0/datalake_topic+0+0000000000.avro --file datalake_topic+0+0000000000.avro --account-name <Your Storage Account Name> --account-key <Your Account Key>
Use avro-tools-1.9.0.jar
(available at the Apache Avro site) to print the records.
java -jar avro-tools-1.8.2.jar tojson datalake_topic+0+0000000000.avro
For the file above, you should see the following output:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
The rest of the records are contained in the other two files.
Finally, stop the Connect worker and all other Confluent services by running:
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
You can stop all services and remove any data generated during this quick start
by entering the following command:
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
Exactly-once Semantics
The Azure Data Lake Storage Gen2 connector is able to provide exactly-once
semantics to consumers of the objects it exports to Azure Data Lake Storage Gen2,
if the connector is supplied with a deterministic partitioner.
Currently, out of the available partitioners, the default and field partitioners
are always deterministic. TimeBasedPartitioner
can be deterministic with
some configurations as discussed below. This implies that when any of these
partitioners is used, file splitting always happens at the same offsets for a
given set of Kafka records. These partitioners take into account flush.size
and schema.compatibility
to decide when to roll and save a new file to Azure
Data Lake Storage Gen2. The connector always delivers files in Azure Data Lake
Storage Gen2 that contain the same records, even under the presence of failures.
If a connector task fails before an upload completes, the file will be still in
the temp/
folder . If, on the other hand, a failure occurs after the upload
has completed, but before the corresponding offset is committed to Kafka by the
connector, then a re-upload will take place. However, this type of re-upload is
transparent to the user of the Azure Data Lake Storage Gen2 folder, who at any
time will have access to the same records made eventually available by
successful uploads to Azure Data Lake Storage Gen2.
To guarantee exactly-once semantics with the TimeBasedPartitioner
, the
connector must be configured to use a deterministic implementation of
TimestampExtractor
and a deterministic rotation strategy. The deterministic
timestamp extractors are Kafka records (timestamp.extractor=Record
) or record
fields (timestamp.extractor=RecordField
). The deterministic rotation
strategy configuration is rotate.interval.ms
(setting
rotate.schedule.interval.ms
is nondeterministic and will invalidate
exactly-once guarantees).
Reaction to Schema Evolution
The Azure Data Lake Storage Gen2 connector supports schema evolution and reacts
to schema changes of data according to the schema.compatibility
configuration. This section explains how the connector reacts to schema
evolution under different values of schema.compatibility
. The
schema.compatibility
can be set to NONE
, BACKWARD
, FORWARD
and
FULL
, which means NO compatibility, BACKWARD compatibility, FORWARD
compatibility and FULL compatibility respectively.
NO Compatibility: By default, the schema.compatibility
is set to
NONE
. In this case, the connector ensures that each file written to Azure
Data Lake Storage Gen2 has the proper schema. When the connector observes a
schema change in data, it commits the current set of files for the affected
topic partitions and writes the data with new schema in new files.
BACKWARD Compatibility: If a schema is evolved in a backward compatible
way, the connector can always use the latest schema to query all the data
uniformly. For example, removing fields is a backward compatible change to a
schema, since when the connector encounters records written with the old
schema that contain these fields, the connector can just ignore them. Adding a
field with a default value is also backward compatible.
If BACKWARD
is specified in the schema.compatibility
, the connector
keeps track of the latest schema used in writing data to Azure Data Lake
Storage Gen2. If a data record with a schema version larger than current
latest schema arrives, the connector commits the current set of files and
writes the data record with new schema to new files. For data records arriving
at a later time that use an earlier schema, the connector projects the data
record to the latest schema before writing to the same set of files in Azure
Data Lake Storage Gen2.
FORWARD Compatibility: If a schema is evolved in a forward compatible way,
the connector can always use the oldest schema to query all the data
uniformly. Removing a field that had a default value is forward compatible,
since the old schema will use the default value when the field is missing.
If FORWARD
is specified in the schema.compatibility
, the connector
projects the data to the oldest schema before writing to the same set of files
in Azure Data Lake Storage Gen2.
Full Compatibility: Full compatibility means that old data can be read
with the new schema and new data can also be read with the old schema.
If FULL
is specified in the schema.compatibility
, the connector
performs the same action as BACKWARD
.
Schema evolution in the Azure Data Lake Storage Gen2 connector works the same way
as in the Kafka Connect S3 connector.
Write JSON message values into Azure Data Lake Storage Gen2
The example settings file is shown below:
name=adls-gen2-sink
connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector
tasks.max=1
topics=datalake_topic
flush.size=100
# Required configuration
azure.datalake.gen2.client.id=<your client id>
azure.datalake.gen2.client.key=<your client key>
# The following define the information used to validate the license stored in Kafka
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
The first few settings are common to most connectors. topics
specifies the
topics to export data from, in this case datalake_topic
. The property
flush.size
specifies the number of records per partition the connector needs
to write to before completing a multiblock upload to Azure Data Lake Storage
Gen2.
The azure.datalake.gen2.client.id
and azure.datalake.gen2.client.key
are
your required Azure credentials. This is a licensed Confluent connector. Enter
the following for testing purposes. For more on the this look at the Azure
Data Lake Storage Gen2 Licensing
section.
azure.datalake.gen2.account.name=<your account name>
azure.datalake.gen2.token.endpoint=<your azure oauth2 token endpoint>
The next settings are specific to Azure Data Lake Storage Gen2. A mandatory
setting is the name of your Azure Data Lake Gen2 store/account
azure.datalake.gen2.account.name
to host the exported Kafka records. Another
mandatory configuration setting is azure.datalake.gen2.token.endpoint
. The
connector authenticates access to your data lake using this URL.
format.class=io.confluent.connect.azure.storage.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
These class settings are required to specify the output file format, which is
currently io.confluent.connect.azure.storage.format.avro.AvroFormat
,
io.confluent.connect.azure.storage.format.json.JsonFormat
,
io.confluent.connect.azure.storage.format.parquet.ParquetFormat
or
io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
, and the
partitioner class
schema.compatibility=NONE
Finally, schema evolution is disabled in this example by setting
schema.compatibility
to NONE
, as explained above.
For detailed descriptions for all the available configuration options of the
Azure Data Lake Storage Gen2 connector go to
Azure Data Lake Storage Gen2 Sink Connector Configuration Properties.
Write raw message values into Azure Data Lake Storage Gen2
It is possible to use the Azure Data Lake Storage Gen2 connector to write out the
unmodified original message values into newline-separated files in Azure Data
Lake Storage Gen2. To accomplish this configure Kafka Connect so it does not
deserialize any of the messages, and configure the Azure Data Lake Storage Gen2
connector to store the message values in a binary format in Azure Data Lake
Storage Gen2.
The first part of the Azure Data Lake Storage Gen2 connector configuration is
similar to other examples.
name=datalake-raw-sink
connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector
tasks.max=1
topics=datalake_topic
flush.size=3
The topics
setting specifies the topics you want to export data from, which
is datalake_topic
in the example. The property flush.size
specifies the
number of records per partition the connector needs to write before completing
an upload to Azure Data Lake Storage Gen2.
Next, configure account name and compression type.
azure.datalake.gen2.account.name=<your account name>
azure.datalake.gen2.token.endpoint=<your azure oauth2 token endpoint>
azure.datalake.gen2.client.id=<your client id>
azure.datalake.gen2.client.key=<your client key>
az.compression.type=gzip
The next settings are specific to Azure Data Lake Storage Gen2. A mandatory
setting is the account name of your Gen2 Azure Data Lake,
azure.datalake.account.name
which will host the exported Kafka records.
Another mandatory configuration setting is azure.datalake.token.endpoint
.
The connector authenticates access to your data lake using this URL. The
azure.datalake.client.id
and azure.datalake.client.key
are your required
Azure client credentials.
The az.compression.type
specifies that the Azure Data Lake Storage Gen2
connector should compress all Azure Data Lake Storage Gen2 files using GZIP
compression, adding the .gz
extension to any files (see below).
This example configuration is typical of most Azure Data Lake Storage Gen2
connectors. Now, configure the connector to read the raw message values and
write them in binary format:
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
schema.compatibility=NONE
The value.converter
setting overrides the connector default in the
Connect worker configuration. ByteArrayConverter
is used to instruct
Connect to skip deserializing the message values and provide the message
values in their raw binary form. The format.class
setting is used to
instruct the Azure Data Lake Storage Gen2 connector to write these binary message
values as-is into Azure Data Lake Storage Gen2 files. By default the messages
written to the same Azure Data Lake Storage Gen2 file are separated by a newline
character sequence, but you can control this with the
format.bytearray.separator
setting. You may want to consider setting this if
your messages might contain newlines. Also, by default the files written to Azure
Data Lake Storage Gen2 have an extension of .bin
(before compression, if
enabled), or you can use the format.bytearray.extension
setting to change
the pre-compression filename extension.
Next, you need to decide how you want to partition the consumed messages in Azure
Data Lake Storage Gen2 files. You have a few options, including the default
partitioner that preserves the same partitions as in Kafka:
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
Or, you could partition using the timestamp of the Kafka messages.
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Record
Or, you can use the timestamp that the Azure Data Lake Storage Gen2 connector
processes each message.
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Wallclock
Custom partitioners are always an option, too. Just be aware that since the
record value is an opaque binary value, Connect cannot extract timestamps
from fields using the RecordField
option.
The Azure Data Lake Storage Gen2 connector configuration outlined above results
in newline-delimited gzipped objects in Azure Data Lake Storage Gen2 with
.bin.gz
.
Additional Documentation
AZURE DATA LAKE STORAGE GEN2 SINK