Azure Data Lake Storage Gen2 Sink Connector for Confluent Platform¶
You can use the Azure Data Lake Storage Gen2 connector, currently available as a
sink connector, to export data from Apache Kafka® topics to Azure Data Lake Storage
Gen2 files in Avro
, JSON
, Parquet
or ByteArray
formats.
Depending on your configuration, the Azure Data Lake Storage Gen2 connector can
export data by guaranteeing exactly-once delivery semantics to consumers of the
Azure Data Lake Storage Gen2 files it produces.
The Azure Data Lake Storage Gen2 sink connector periodically polls data from Kafka and, in turn, uploads it to Azure Data Lake Storage Gen2. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an Azure Data Lake Storage Gen2 file. The key name encodes the topic, the Kafka partition, and the start offset of this data chunk. If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used. The size of each data chunk is determined by the number of records written to Azure Data Lake Storage Gen2 and by schema compatibility.
Note that this connector isn’t compatible with Azure Data Lake Storage Gen1.
Features¶
The Microsoft Azure Data Lake Storage Gen2 Sink connector includes the following features:
- Exactly once delivery
- Dead Letter Queue
- Multiple tasks
- Pluggable data format with or without schema
- Schema evolution
- Pluggable partitioner
Exactly once delivery¶
Records that are exported using a deterministic partitioner are delivered with exactly-once semantics. See the Exactly-once Semantics section for more details.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Azure Data Lake Storage Gen2 Sink connector supports running one or more
tasks. You can specify the number of tasks in the tasks.max
configuration
parameter. Multiple tasks may improve performance when moving a large amount of
data.
Pluggable data format with or without schema¶
Out of the box, the connector supports writing data to Azure Data Lake Storage
Gen2 in Avro, Parquet and JSON format. Besides records with schema, the
connector supports exporting plain JSON and as byte array records without schema
in text files, one record per-line. In general, the connector may accept any
format that provides an implementation of the Format
interface.
Schema evolution¶
When schemas are used, the connector supports schema evolution based on schema
compatibility modes. The available modes are: NONE
, BACKWARD
,
FORWARD
and FULL
and a selection can be made by setting the property
schema.compatibility
in the connector’s configuration. When the connector
observes a schema change, it decides whether to roll the file or project the
record to the proper schema according to the schema.compatibility
configuration in use. See the Reaction to Schema Evolution section for more
details.
Pluggable partitioner¶
The connector comes out of the box with partitioners that support default
partitioning based on Kafka partitions, field partitioning, and time-based
partitioning in days or hours. You may implement your own partitioners by
extending the Partitioner
class. Additionally, you can customize time-based
partitioning by extending the TimeBasedPartitioner
class.
Note that this connector is not compatible with SMTs that change the name of the topic.
Limitations¶
The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license, properties, see Confluent Platform license. For information about the license topic, see License topic configuration.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Azure Data Lake Storage Gen2 Sink Connector for Confluent Platform.
Install the Azure Data Lake Storage Gen2 connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java 1.8.
Azure CLI. For more information about using the Azure CLI with Azure storage, see Using the Azure CLI with Azure Storage.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-azure-data-lake-gen2-storage:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-azure-data-lake-gen2-storage:1.6.13
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Mapping records to Azure Data Lake Storage Gen2 Objects¶
The Azure Data Lake Storage Gen2 Sink connector consumes records from the specified topics, organizes them into different partitions, and then opens a stream with Azure storage. The connector then commits the data and flushes it directly into the Azure Data Lake Storage Gen2 bucket. It uses Azure Data Lake Storage Gen2 object paths that include the Kafka topic and partition, the computed partition, and the filename. The Azure Data Lake Storage Gen2 Sink connector offers several ways to customize this behavior, including:
- Controlling the names of the Azure Data Lake Storage Gen2 objects
- Determining how records are partitioned into Azure Data Lake Storage Gen2 objects
- The format used to serialize sets of records into Azure Data Lake Storage Gen2 objects
- When to upload Azure Data Lake Storage Gen2 objects
Azure Data Lake Storage Gen2 object names¶
The Azure Data Lake Storage Gen2 data model is a flat structure: each bucket
stores objects, and the name of each Azure Data Lake Storage Gen2 object serves
as the unique key. However, a logical hierarchy can be inferred when the Azure
Data Lake Storage Gen2 object names uses directory delimiters, such as /
.
The Azure Data Lake Storage Gen2 Sink connector allows you to customize the names
of the Azure Data Lake Storage Gen2 objects it uploads to the Azure Data Lake
Storage Gen2 bucket.
In general, the names of the Azure Data Lake Storage Gen2 object uploaded by the Azure Data Lake Storage Gen2 Sink connector follow this format:
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
where:
<prefix>
is specified with the connector’stopics.dir
configuration property, which defaults to the literal valuetopics
and helps create uniquely named Azure Data Lake Storage Gen2 objects that don’t clash with existing Azure Data Lake Storage Gen2 objects in the same bucket.<topic>
corresponds to the name of the Kafka topic from which the records in this Azure Data Lake Storage Gen2 object were read.<encodedPartition>
is generated by the Azure Data Lake Storage Gen2 Sink Connector’s partitioner (see Partitioning records into Azure Data Lake Storage Gen2 objects).<kafkaPartition>
is the Kafka partition number from which the records in this Azure Data Lake Storage Gen2 object were read.<startOffset>
is the Kafka offset of the first record written to this Azure Data Lake Storage Gen2 object.<format>
is the extension identifying the format in which the records are serialized in this Azure Data Lake Storage Gen2 object.
If desired, the /
and +
characters can be changed using the connector’s
directory.delim
and file.delim
configuration properties.
Partitioning records into Azure Data Lake Storage Gen2 objects¶
The Azure Data Lake Storage Gen2 Sink connector’s partitioner determines how
records read from a Kafka topic are partitioned into Azure Data Lake Storage Gen2
objects. The partitioner determines the <encodedPartition>
portion of the
Azure Data Lake Storage Gen2 object names (see
Azure Data Lake Storage Gen2 object names).
The partitioner is specified in the connector configuration with the
partitioner.class
configuration property. The Azure Data Lake Storage Gen2
Sink connector comes with the following partitioners:
- Default Kafka Partitioner: The
io.confluent.connect.storage.partitioner.DefaultPartitioner
preserves the same topic partitions as in Kafka, and records from each topic partition ultimately end up in Azure Data Lake Storage Gen2 objects with names that include the Kafka topic and Kafka partitions. The<encodedPartition>
is always<topicName>/partition=<kafkaPartition>
, resulting in Azure Data Lake Storage Gen2 object names such as<prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. - Field Partitioner: The
io.confluent.connect.storage.partitioner.FieldPartitioner
determines the partition from the field within each each record identified by the connector’spartition.field.name
configuration property, which has no default. This partitioner requiresSTRUCT
record type values. The<encodedPartition>
is always<topicName>/<fieldName>=<fieldValue>
, resulting in Azure Data Lake Storage Gen2 Sink connector object names of the form<prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>
. - Time Based Partitioner: The
io.confluent.connect.storage.partitioner.TimeBasedPartitioner
determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties:- The
path.format
configuration property specifies the pattern used for the<encodedPartition>
portion of the Azure Data Lake Storage Gen2 object name. For example, whenpath.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
, Azure Data Lake Storage Gen2 object names will have the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. - The
partition.duration.ms
configuration property defines the maximum granularity of the Azure Data Lake Storage Gen2 objects within a single encoded partition directory. For example, settingpartition.duration.ms=600000
(10 minutes) will result in each Azure Data Lake Storage Gen2 object in that directory having no more than 10 minutes of records. - The
locale
configuration property specifies the JDK’s locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English, andfr-FR
for French (in France). These may vary by Java version; see the available locales. - The
timezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,RecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property.
- The
- Daily Partitioner: The
io.confluent.connect.storage.partitioner.DailyPartitioner
is equivalent to the TimeBasedPartitioner withpath.format='year'=YYYY/'month'=MM/'day'=dd
andpartition.duration.ms=86400000
(one day, for one Azure Data Lake Storage Gen2 object in each daily directory). This partitioner always results in Azure Data Lake Storage Gen2 object names of the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the following connector configuration properties:- The
locale
configuration property specifies the JDK’s locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English, andfr-FR
for French (in France). These may vary by Java version; see the available locales. - The
timezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,RecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property.
- The
- Hourly Partitioner: The
io.confluent.connect.storage.partitioner.HourlyPartitioner
is equivalent to the TimeBasedPartitioner withpath.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
andpartition.duration.ms=3600000
(one hour, for one Azure Data Lake Storage Gen2 object in each hourly directory). This partitioner always results in Azure Data Lake Storage Gen2 object names of the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the following connector configuration properties:- The
locale
configuration property specifies the JDK’s locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English,fr-FR
for French (in France). These may vary by Java version; see the available locales. - The
timezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,RecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property.
- The
As noted below, the choice of timestamp.extractor
affects whether the Azure
Data Lake Storage Gen2 Sink connector can support exactly once delivery.
You can also choose to use a custom partitioner by implementing the
io.confluent.connect.storage.partitioner.Partitioner
interface, packaging
your implementation into a JAR file, and then:
- Place the JAR file into the
share/java/kafka-connect-azure-data-lake-gen2-storage
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
- Configure Azure Data Lake Storage Gen2 Sink connectors to use your fully-qualified partitioner class name.
Azure Data Lake Storage Gen2 object formats¶
The Azure Data Lake Storage Gen2 Sink connector can serialize multiple records
into each Azure Data Lake Storage Gen2 object using a number of formats. The
connector’s format.class
configuration property identifies the name of the
Java class that implements the io.confluent.connect.storage.format.Format
interface, and the Azure Data Lake Storage Gen2 Sink connector comes with several
implementations:
Avro: Use
format.class=io.confluent.connect.azure.storage.format.avro.AvroFormat
to write the Azure Data Lake Storage Gen2 object as an Avro container file and will include the Avro schema in the container file followed by one or more records. The connector’savro.codec
configuration property specifies the Avro compression code, and values can benull
(the default) for no Avro compression,deflate
to use the deflate algorithm as specified in RFC 1951,snappy
to use Google’s Snappy compression library, andbzip2
for BZip2 compression. Optionally setenhanced.avro.schema.support=true
to enable enum symbol preservation and package name awareness.JSON: Use
format.class=io.confluent.connect.azure.storage.format.json.JsonFormat
to write the Azure Data Lake Storage Gen2 object as a file containing one JSON serialized record per line. The connector’saz.compression.type
configuration property can be set tonone
(the default) for no compression orgzip
for GZip compression.Parquet: Use
format.class=io.confluent.connect.azure.storage.format.parquet.ParquetFormat
to write the Azure Data Lake Storage Gen2 object as a Parquet file columnar storage format. The connector’sparquet.codec
configuration property specifies the Parquet compression code, and values can besnappy
(the default) to use Google’s Snappy compression library,none
for no compression,gzip
to use GNU’s GZip compression library,lzo
to use LZO (Lempel–Ziv–Oberhumer) compression library,brotli
to use Google’s Brotli compression library,lz4
to use BSD licensed LZ4 compression library andzstd
to use Facebook’s ZStandard compression library.Important
You must use the AvroConverter,
ProtobufConverter
, orJsonSchemaConverter
withParquetFormat
for this connector. Attempting to use theJsonConverter
(with or without schemas) results in a NullPointerException and a StackOverflowException.Raw Bytes: Use
format.class=io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
to write the raw serialized record values delimited with the JDK’s line separator to the Azure Data Lake Storage Gen2 object. This requires using thevalue.converter=org.apache.kafka.connect.converters.ByteArrayConverter
with the connector. Use a different delimiter by specifying the connectorformat.bytearray.separator
configuration property.
You can also choose to use a custom partitioner by implementing the
io.confluent.connect.storage.format.Format
interface, packaging your
implementation into a JAR file, and then:
- Place the JAR file into the
share/java/kafka-connect-azure-data-lake-gen2-storage
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
- Configure Azure Data Lake Storage Gen2 Sink connectors with
format.class
set to the fully-qualified class name of your format implementation.
Azure Data Lake Storage Gen2 object uploads¶
As the Azure Data Lake Storage Gen2 Sink connector processes each record, it uses the partitioner to determine which encoded partition to write the record. This continues for each partition until the connector determines that a partition has enough records and should be flushed and uploaded to the Azure Data Lake Storage Gen2 bucket using the Azure Data Lake Storage Gen2 object name for that partition. This technique of knowing when to flush a partition file and upload it to Azure Data Lake Storage is called the rotation strategy, and there are a number of ways to control this behavior.
Maximum number of records: The connector’s
flush.size
configuration property specifies the maximum number of records that should be written to a single Azure Data Lake Storage Gen2 object. There is no default for this setting.Important
Rotation strategy logic: In the following rotation strategies, the logic to flush files to storage is triggered when a new record arrives, after the defined interval or scheduled interval time. Flushing files is also triggered periodically by the
offset.flush.interval.ms
setting defined in the Connect worker configuration. Theoffset.flush.interval.ms
setting defaults to 60000 ms (60 seconds). If you enable the propertiesrotate.interval.ms
orrotate.schedule.interval.ms
and ingestion rate is low, you should setoffset.flush.interval.ms
to a smaller value so that records flush at the rotation interval (or close to the interval) . Leaving theoffset.flush.interval.ms
set to the default 60 seconds may cause records to stay in an open file for longer than expected, if no new records get processed that trigger rotation.Maximum span of record time: In this rotation strategy, the connector’s
rotate.interval.ms
property specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. The timestamp for each file starts with the record timestamp of the first record written to the file, as determined by the partitioner’stimestamp.extractor
. As long as the next record’s timestamp fits within the timespan specified by therotate.interval.ms
property, the record is written to the file. If a record’s timestamp does not fit within the timespan ofrotate.interval.ms
, the connector flushes the file, uploads it to Azure Data Lake Storage Gen2, and commits the offsets of the records in that file. After this, the connector creates a new file with a timespan that starts with the first record, and writes the first record to the file.
Scheduled rotation: In this rotation strategy, the connector’s
rotate.schedule.interval.ms
specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. Unlikerotate.interval.ms
, with scheduled rotation the timestamp for each file starts with the system time that the first record is written to the file. You must have the partitioner parametertimezone
configured (defaults to an empty string) when using this configuration property, otherwise the connector fails with an exception.As long as a record is processed within the timespan specified by
rotate.schedule.interval.ms
, the record will be written to the file. As soon as a new record is processed after the timespan for the current file, the file is flushed, uploaded to Azure Data Lake Storage Gen2, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the new record is written to the file. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour. The default value-1
means that this feature is disabled.Scheduled rotation uses
rotate.schedule.interval.ms
to close the file and upload to Azure Data Lake Storage Gen2 on a regular basis using the current time, rather than the record time. Even if the connector has no more records to process, Connect will still call the connector at least everyoffset.flush.interval.ms
, as defined in the Connect worker’s configuration file. And every time this occurs, the connector uses the current time to determine if the currently opened file should be closed and uploaded to Azure Data Lake Storage Gen2.
These rotation strategies can be combined as needed:
- When using
rotate.interval.ms
, the connector only closes and uploads a file to Azure Data Lake Storage Gen2 when the next file does not belong based upon the timestamp. In other words, if the connector has no more records to process, the connector may keep the file open for a significant period of time, until the connector can process another record. - When using
rotate.schedule.interval.ms
, the connector flushes the file based on thewallclock
and does not wait for the new records.
Note
Not all rotation strategies are compatible with the Azure Data Lake Storage Gen2 Sink connector’s ability to deliver Azure Data Lake Storage Gen2 objects exactly once with eventual consistency. See the Exactly-once Semantics section for details.
Quick start¶
In this quick start, the Azure Data Lake Storage Gen2 connector is used to export data produced by the Avro console producer to Azure Data Lake Storage Gen2.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Note that before you begin, you should create an Azure Data Lake Storage Gen2 account and grant write access to the user completing these procedures. For more information, see Get started with Azure Data Lake Storage Gen2 using the Azure portal Also, for information on setting up the account needed for the Azure Data Lake Storage Gen2 connector, see Service-to-service authentication with Azure Data Lake Storage using Azure Active Directory.
To get started, complete the following steps:
Install the connector through the Confluent Hub Client.
# run from your CP installation directory confluent connect plugin install confluentinc/kafka-connect-azure-data-lake-gen2-storage:latest
Tip
By default, the connector will install the plugin into the
share/confluent-hub-components
directory and add the directory to the plugin path. For the plugin path change to take effect, restart the Connect worker. For details for setting up and using the Azure CLI, see Azure Data Lake Gen2 CLI.Start the services using the Confluent CLI.
|confluent_start|
Every service starts in order, printing a message with its status.
Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP] Starting KSQL Server KSQL Server is [UP] Starting Control Center Control Center is [UP]
Note
Ensure the Azure Data Lake Storage Gen2 connector has write access to the Azure Data Lake Storage Gen2 account shown in
azure.datalake.gen2.account.name
and can deploy credentials successfully.To import a few records with a simple schema in Kafka, start the Avro console producer as follows:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic datalake_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
In the console producer, enter the following:
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"} The nine records entered are published to the |ak| topic ``datalake_topic`` in Avro format.
Create a
datalake.properties
file with the following contents:name=adls-gen2-sink connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector tasks.max=1 topics=datalake_topic flush.size=3 azure.datalake.gen2.client.id=<your client id> azure.datalake.gen2.client.key=<your client key> azure.datalake.gen2.account.name=<your account name> azure.datalake.gen2.token.endpoint=https://login.microsoftonline.com/<tenant-id>/oauth2/token format.class=io.confluent.connect.azure.storage.format.avro.AvroFormat confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Start the Azure Data Lake Sink Gen2 Sink connector.
Before starting the connector, ensure you do the following:
Ensure the configurations in
datalake.properties
are properly set to your Azure Data Lake Storage Gen2 configurations. For this example, ensure the following:azure.datalake.gen2.account.name
points to your Data Lake storeazure.datalake.gen2.client.id
is set to your user ID. Note that you can find your client ID in the Azure portal.azure.datalake.gen2.client.key
is set to your user’s secret key. Note that you can find your secret key in the Azure portal.
The user ID (or client ID) should have permission to write to the Azure Data Lake Storage Gen2 account.
Set
azure.datalake.gen2.token.endpoint
to the OAuth 2 endpoint as described in Step 4: Get the OAuth 2.0 token endpoint (only for Java-based applications), and use the V1 token endpoint.
Start the connector by loading its configuration with the following command:
|confluent_load| datalake-sink --config datalake.properties
Configuration
{ "name": "adls-gen2-sink", "config": { "name":"adls-gen2-sink", "connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector", "tasks.max":"1", "topics":"datalake_topic", "flush.size":"3", "azure.datalake.gen2.client.id":"<your client id>", "azure.datalake.gen2.client.key":"<your client key>", "azure.datalake.gen2.account.name":"<your account name>", "azure.datalake.gen2.token.endpoint":"https://login.microsoftonline.com/<tenant-id>/oauth2/token", "format.class":"io.confluent.connect.azure.storage.format.avro.AvroFormat", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor":"1" }, "tasks": [] }
Check that the connector started successfully and review the Connect worker’s log by entering the following command:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from Kafka to Azure Data Lake Storage Gen2.
Once the connector has ingested some records, check that the data is available in Azure Data Lake Storage Gen2 by using the following Azure CLI command:
az storage blob list --account-name <Your Storage Account Name> --container-name topics --account-key <Your Account Key>
In the JSON response, you should see three objects with keys.
datalake_topic/partition=0/datalake_topic+0+0000000000.avro datalake_topic/partition=0/datalake_topic+0+0000000003.avro datalake_topic/partition=0/datalake_topic+0+0000000006.avro
Each file is encoded as
<topic>+<kafkaPartition>+<startOffset>.<format>
.To verify the contents, copy each file from Azure Data Lake Storage Gen2 to your local filesystem. Use the following Azure CLI command changing the destination to what makes sense for you:
az storage blob download --container-name topics --name datalake_topic/partition=0/datalake_topic+0+0000000000.avro --file datalake_topic+0+0000000000.avro --account-name <Your Storage Account Name> --account-key <Your Account Key>
Use
avro-tools-1.9.0.jar
(available at the Apache Avro site) to print the records.java -jar avro-tools-1.8.2.jar tojson datalake_topic+0+0000000000.avro
For the file above, you should see the following output:
{"f1":"value1"} {"f1":"value2"} {"f1":"value3"}
The rest of the records are contained in the other two files.
Stop the Connect worker and all other Confluent services by running:
confluent local stop
Your output should resemble:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka REST Kafka REST is [DOWN] Stopping Schema Registry Schema Registry is [DOWN] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN]
You can stop all services and remove any data generated during this quick start by entering the following command:
|confluent_destroy|
Your output should resemble:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka REST Kafka REST is [DOWN] Stopping Schema Registry Schema Registry is [DOWN] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN] Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
Exactly-once Semantics¶
The Azure Data Lake Storage Gen2 connector is able to provide exactly-once semantics to consumers of the objects it exports to Azure Data Lake Storage Gen2, if the connector is supplied with a deterministic partitioner.
Currently, out of the available partitioners, the default and field partitioners
are always deterministic. TimeBasedPartitioner
can be deterministic with
some configurations as discussed below. This implies that when any of these
partitioners is used, file splitting always happens at the same offsets for a
given set of Kafka records. These partitioners take into account flush.size
and schema.compatibility
to decide when to roll and save a new file to Azure
Data Lake Storage Gen2. The connector always delivers files in Azure Data Lake
Storage Gen2 that contain the same records, even under the presence of failures.
If a connector task fails before an upload completes, the file will be still in
the temp/
folder . If, on the other hand, a failure occurs after the upload
has completed, but before the corresponding offset is committed to Kafka by the
connector, then a re-upload will take place. However, this type of re-upload is
transparent to the user of the Azure Data Lake Storage Gen2 folder, who at any
time will have access to the same records made eventually available by
successful uploads to Azure Data Lake Storage Gen2.
To guarantee exactly-once semantics with the TimeBasedPartitioner
, the
connector must be configured to use a deterministic implementation of
TimestampExtractor
and a deterministic rotation strategy. The deterministic
timestamp extractors are Kafka records (timestamp.extractor=Record
) or record
fields (timestamp.extractor=RecordField
). The deterministic rotation
strategy configuration is rotate.interval.ms
(setting
rotate.schedule.interval.ms
is nondeterministic and will invalidate
exactly-once guarantees).
Reaction to Schema Evolution¶
The Azure Data Lake Storage Gen2 connector supports schema evolution and reacts
to schema changes of data according to the schema.compatibility
configuration. This section explains how the connector reacts to schema
evolution under different values of schema.compatibility
. The
schema.compatibility
can be set to NONE
, BACKWARD
, FORWARD
and
FULL
, which means NO compatibility, BACKWARD compatibility, FORWARD
compatibility and FULL compatibility respectively.
NO Compatibility: By default, the
schema.compatibility
is set toNONE
. In this case, the connector ensures that each file written to Azure Data Lake Storage Gen2 has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.BACKWARD Compatibility: If a schema is evolved in a backward compatible way, the connector can always use the latest schema to query all the data uniformly. For example, removing fields is a backward compatible change to a schema, since when the connector encounters records written with the old schema that contain these fields, the connector can just ignore them. Adding a field with a default value is also backward compatible.
If
BACKWARD
is specified in theschema.compatibility
, the connector keeps track of the latest schema used in writing data to Azure Data Lake Storage Gen2. If a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time that use an earlier schema, the connector projects the data record to the latest schema before writing to the same set of files in Azure Data Lake Storage Gen2.FORWARD Compatibility: If a schema is evolved in a forward compatible way, the connector can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.
If
FORWARD
is specified in theschema.compatibility
, the connector projects the data to the oldest schema before writing to the same set of files in Azure Data Lake Storage Gen2.Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.
If
FULL
is specified in theschema.compatibility
, the connector performs the same action asBACKWARD
.
Schema evolution in the Azure Data Lake Storage Gen2 connector works the same way as in the Kafka Connect S3 Sink connector.
Write JSON message values into Azure Data Lake Storage Gen2¶
The example settings file is shown below:
name=adls-gen2-sink
connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector
tasks.max=1
topics=datalake_topic
flush.size=100
# Required configuration
azure.datalake.gen2.client.id=<your client id>
azure.datalake.gen2.client.key=<your client key>
# The following define the information used to validate the license stored in Kafka
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
The first few settings are common to most connectors. topics
specifies the
topics to export data from, in this case datalake_topic
. The property
flush.size
specifies the number of records per partition the connector needs
to write to before completing a multiblock upload to Azure Data Lake Storage
Gen2.
The azure.datalake.gen2.client.id
and azure.datalake.gen2.client.key
are
your required Azure credentials. This is a licensed Confluent connector. Enter
the following for testing purposes. For more on the this look at the Azure
Data Lake Storage Gen2 Licensing
section.
azure.datalake.gen2.account.name=<your account name>
azure.datalake.gen2.token.endpoint=<your azure oauth2 token endpoint>
The next settings are specific to Azure Data Lake Storage Gen2. A mandatory
setting is the name of your Azure Data Lake Gen2 store/account
azure.datalake.gen2.account.name
to host the exported Kafka records. Another
mandatory configuration setting is azure.datalake.gen2.token.endpoint
. The
connector authenticates access to your data lake using this URL.
format.class=io.confluent.connect.azure.storage.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
These class settings are required to specify the output file format, which is
currently io.confluent.connect.azure.storage.format.avro.AvroFormat
,
io.confluent.connect.azure.storage.format.json.JsonFormat
,
io.confluent.connect.azure.storage.format.parquet.ParquetFormat
or
io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
, and the
partitioner class
schema.compatibility=NONE
Finally, schema evolution is disabled in this example by setting
schema.compatibility
to NONE
, as explained above.
For detailed descriptions for all the available configuration options of the Azure Data Lake Storage Gen2 connector go to Configuration Reference for Azure Data Lake Storage Gen2 Sink Connector for Confluent Platform.
Write raw message values into Azure Data Lake Storage Gen2¶
It is possible to use the Azure Data Lake Storage Gen2 connector to write out the unmodified original message values into newline-separated files in Azure Data Lake Storage Gen2. To accomplish this configure Kafka Connect so it does not deserialize any of the messages, and configure the Azure Data Lake Storage Gen2 connector to store the message values in a binary format in Azure Data Lake Storage Gen2.
The first part of the Azure Data Lake Storage Gen2 connector configuration is similar to other examples.
name=datalake-raw-sink
connector.class=io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector
tasks.max=1
topics=datalake_topic
flush.size=3
The topics
setting specifies the topics you want to export data from, which
is datalake_topic
in the example. The property flush.size
specifies the
number of records per partition the connector needs to write before completing
an upload to Azure Data Lake Storage Gen2.
Next, configure account name and compression type.
azure.datalake.gen2.account.name=<your account name>
azure.datalake.gen2.token.endpoint=<your azure oauth2 token endpoint>
azure.datalake.gen2.client.id=<your client id>
azure.datalake.gen2.client.key=<your client key>
az.compression.type=gzip
The next settings are specific to Azure Data Lake Storage Gen2. A mandatory
setting is the account name of your Gen2 Azure Data Lake,
azure.datalake.account.name
which will host the exported Kafka records.
Another mandatory configuration setting is azure.datalake.token.endpoint
.
The connector authenticates access to your data lake using this URL. The
azure.datalake.client.id
and azure.datalake.client.key
are your required
Azure client credentials.
The az.compression.type
specifies that the Azure Data Lake Storage Gen2
connector should compress all Azure Data Lake Storage Gen2 files using GZIP
compression, adding the .gz
extension to any files (see below).
This example configuration is typical of most Azure Data Lake Storage Gen2 connectors. Now, configure the connector to read the raw message values and write them in binary format:
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
schema.compatibility=NONE
The value.converter
setting overrides the connector default in the
Connect worker configuration. ByteArrayConverter
is used to instruct
Connect to skip deserializing the message values and provide the message
values in their raw binary form. The format.class
setting is used to
instruct the Azure Data Lake Storage Gen2 connector to write these binary message
values as-is into Azure Data Lake Storage Gen2 files. By default the messages
written to the same Azure Data Lake Storage Gen2 file are separated by a newline
character sequence, but you can control this with the
format.bytearray.separator
setting. You may want to consider setting this if
your messages might contain newlines. Also, by default the files written to Azure
Data Lake Storage Gen2 have an extension of .bin
(before compression, if
enabled), or you can use the format.bytearray.extension
setting to change
the pre-compression filename extension.
Next, you need to decide how you want to partition the consumed messages in Azure Data Lake Storage Gen2 files. You have a few options, including the default partitioner that preserves the same partitions as in Kafka:
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
Or, you could partition using the timestamp of the Kafka messages.
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Record
Or, you can use the timestamp that the Azure Data Lake Storage Gen2 connector processes each message.
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Wallclock
Custom partitioners are always an option, too. Just be aware that since the
record value is an opaque binary value, Connect cannot extract timestamps
from fields using the RecordField
option.
The Azure Data Lake Storage Gen2 connector configuration outlined above results
in newline-delimited gzipped objects in Azure Data Lake Storage Gen2 with
.bin.gz
.