.. _connect_azure_data_lake_gen1_storage:

|kconnect-long| |az| Data Lake Storage Gen1 Sink Connector
==========================================================

You can use the Azure Data Lake Storage Gen1 connector, currently available as a sink, to export data from
|ak-tm| topics to Azure Data Lake Storage Gen1 files in either Avro or JSON formats. Depending on your
environment, the Azure Data Lake Storage Gen1 connector can export data by guaranteeing exactly-once delivery
semantics to consumers of the Azure Data Lake Storage Gen1 files it produces.

The Azure Data Lake Storage Gen1 sink connector periodically polls data from |ak| and, in turn, uploads it to
Azure Data Lake Storage Gen1. A partitioner is used to split the data of every |ak| partition into chunks.
Each chunk of data is represented as an Azure Data Lake Storage Gen1 file. The key name encodes the topic,
the |ak| partition, and the start offset of this data chunk. If no partitioner is specified in the
configuration, the default partitioner which preserves |ak| partitioning is used. The size of each
data chunk is determined by the number of records written to Azure Data Lake Storage Gen1 and by schema
compatibility.

Features
--------

The |az-long| Data Lake Storage Gen1 connector offers a variety of features:

* **Exactly Once Delivery**: Records that are exported using a deterministic partitioner are delivered with exactly-once
  semantics.

* **Pluggable Data Format with or without Schema**: Out of the box, the connector supports writing data to |az| Data Lake Storage Gen1 in Avro
  and JSON format. Besides records with schema, the connector supports exporting plain JSON and as byte array records without schema in
  text files, one record per-line. In general, the connector may accept any format that provides an implementation of
  the ``Format`` interface.

* **Schema Evolution**: When schemas are used, the connector supports schema evolution based on schema compatibility
  modes. The available modes are: ``NONE``, ``BACKWARD``, ``FORWARD`` and ``FULL`` and a selection can be made
  by setting the property ``schema.compatibility`` in the connector's configuration. When the connector observes a
  schema change, it decides whether to roll the file or project the record to the proper schema according to
  the ``schema.compatibility`` configuration in use.

* **Pluggable Partitioner**: The connector comes out of the box with partitioners that support default partitioning
  based on |ak-tm| partitions, field partitioning, and time-based partitioning in days or hours. You may implement your
  own partitioners by extending the ``Partitioner`` class. Additionally, you can customize time-based partitioning by
  extending the ``TimeBasedPartitioner`` class.

.. _connect_azure_data_lake_gen1_storage_install:

Install |az| Data Lake Storage Gen1 Connector
---------------------------------------------


.. include:: ../includes/connector-install.rst

.. include:: ../includes/connector-install-hub.rst

.. codewithvars:: bash

   confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen1-storage:latest

.. include:: ../includes/connector-install-version.rst

.. codewithvars:: bash

   confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen1-storage:1.1.1


--------------------------
Install Connector Manually
--------------------------

`Download and extract the ZIP file <https://www.confluent.io/connector/kafka-connect-s3-source/#download>`_ for your
connector and then follow the manual connector installation :ref:`instructions <connect_install_connectors>`.

.. _connect_azure_data_lake_gen1_storage_license:

License
-------

.. include:: ../includes/enterprise-license.rst

See :ref:`azure-data-lake-connector-license-config` for license properties and :ref:`connect_azure_data_lake_gen1_storage_license-topic-configuration` for information about the license topic.


Quick Start
-----------

In this quick start, the |az| Data Lake Storage Gen1 connector is used to export data produced by the Avro console producer to |az| Data Lake Storage Gen1.


.. note:: **Before you begin:** create an |az| Data Lake Storage Gen1 account and grant **write access** to the user completing these procedures. See `Get started with Azure Data Lake Storage Gen1 using the Azure portal <https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-get-started-portal>`_ for additional information. Also see `Service-to-service authentication with Azure Data Lake Storage Gen1 using Azure Active Directory <https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory>`_ for information on setting up the account needed for the |az| Data Lake Storage Gen1 connector.

Install the connector through the :ref:`confluent_hub_client`.

.. codewithvars:: bash

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen1-storage:latest

.. tip:: By default, it will install the plugin into ``share/confluent-hub-components`` and add the directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect.
   Also see `Azure Data Lake Gen1 CLI <https://docs.microsoft.com/en-gb/azure/data-lake-store/data-lake-store-get-started-cli-2.0>`_ for setuping up and using the |az| CLI.

Start the services using the Confluent CLI.

.. codewithvars:: bash

     |confluent_start|

Every service start in order, printing a message with its status.

.. include:: ../../includes/cli.rst
      :start-after: CE_CLI_startup_output
      :end-before: COS_CP_CLI_startup_output

.. note::

   Make sure the |az| Data Lake Storage Gen1 connector has write access to the |az| Data Lake Storage Gen1 account shown in ``azure.datalake.account.name`` and can deploy credentials successfully.

To import a few records with a simple schema in |ak|, start the Avro console producer as follows:

.. sourcecode:: bash

    ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic datalake_topic \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Then, in the console producer, enter the following:

.. sourcecode:: bash

  {"f1": "value1"}
  {"f1": "value2"}
  {"f1": "value3"}
  {"f1": "value4"}
  {"f1": "value5"}
  {"f1": "value6"}
  {"f1": "value7"}
  {"f1": "value8"}
  {"f1": "value9"}

The nine records entered are published to the |ak| topic ``datalake_topic`` in Avro format.

Create a ``datalake.properties`` file with the following contents:

.. codewithvars:: properties

  name=datalake-sink
  connector.class=io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector
  tasks.max=1
  topics=datalake_topic
  flush.size=3
  azure.datalake.client.id=<your client id>
  azure.datalake.client.key=<your client key>
  azure.datalake.account.name=<your account name>
  azure.datalake.token.endpoint=<your azure oauth2 token endpoint>
  format.class=io.confluent.connect.azure.storage.format.avro.AvroFormat
  confluent.topic.bootstrap.servers=localhost:9092
  confluent.topic.replication.factor=1

Before starting the connector, make sure that the configurations in ``datalake.properties`` are properly set to your configurations of |az| Data Lake Storage Gen1. For this example, make sure that ``azure.datalake.account.name`` points to your Data Lake store, ``azure.datalake.client.id`` is set to your user id, and ``azure.datalake.client.key`` is set to your user's secret key. The user ID or client ID should have permission to write to the Azure Data Lake Storage Gen1 Account.
Finally, set ``azure.datalake.token.endpoint`` to the Oauth 2 endpoint as described `here <https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory#step-4-get-the-oauth-20-token-endpoint-only-for-java-based-applications>`_, and use the v1 token endpoint.
Then start the |az| Blob Storage connector by loading its configuration with the following command.

.. include:: ../../includes/confluent-local-consume-limit.rst

.. codewithvars:: bash

   |confluent_load| datalake-sink|dash| -d datalake.properties
   {
    "name": "datalake-sink",
    "config": {
        "name":"datalake-sink",
        "connector.class":"io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector",
        "tasks.max":"1",
        "topics":"datalake_topic",
        "flush.size":"3",
        "azure.datalake.client.id":"<your client id>",
        "azure.datalake.client.key":"<your client key>",
        "azure.datalake.account.name":"<your account name>",
        "azure.datalake.token.endpoint":"<your azure oauth2 token endpoint>",
        "format.class":"io.confluent.connect.azure.storage.format.avro.AvroFormat",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "confluent.topic.replication.factor":"1"
    },
     "tasks": []
   }

Check that the connector started successfully. Review the |kconnect| worker's log by entering the following:

.. codewithvars:: bash

    |confluent_log| connect

Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads
data from |ak| to |az| Data Lake Storage Gen1.

Once the connector has ingested some records, check that the data is available in |az| Data Lake Storage Gen1. Use the following |az| CLI command:

.. sourcecode:: bash

    az dls fs list --account <your account name> --path /topics

Once you navigate into the subfolders, you should see three objects with keys.

.. sourcecode:: bash

  topics/datalake_topic/partition=0/datalake_topic+0+0000000000.avro
  topics/datalake_topic/partition=0/datalake_topic+0+0000000003.avro
  topics/datalake_topic/partition=0/datalake_topic+0+0000000006.avro

Each file is encoded as ``<topic>+<kafkaPartition>+<startOffset>.<format>``.

To verify the contents, copy each file from |az| Data Lake Storage Gen1 to your local filesystem. Use the following |az| CLI command changing the destination to what makes sense for you:

.. sourcecode:: bash

    az dls fs download --account <your account name> --source-path /topics/datalake_topic/partition=0/datalake_topic+0+0000000000.avro --destination-path "C:\connect\datalake_topic+0+0000000000.avro"

Use ``avro-tools-1.9.0.jar``
(available in `Apache mirrors <http://mirror.metrocast.net/apache/avro/avro-1.9.0/java/avro-tools-1.9.0.jar>`_) to
print the records.

.. sourcecode:: bash

    java -jar avro-tools-1.8.2.jar tojson datalake_topic+0+0000000000.avro

For the file above, you should see the following output:

.. sourcecode:: bash

  {"f1":"value1"}
  {"f1":"value2"}
  {"f1":"value3"}

The rest of the records are contained in the other two files.

Finally, stop the |kconnect| worker and all other Confluent services by running:

.. codewithvars::  bash

    confluent local stop

Your output should resemble:

.. include:: ../../includes/cli.rst
    :start-after: ce_cli_stop_output_start
    :end-before: ce_cli_stop_output_stop

You can stop all services and remove any data generated during this quick start by entering the following command:

.. codewithvars:: bash

        |confluent_destroy|

Your output should resemble:

.. include:: ../../includes/cli.rst
    :start-after: ce_cli_stop_destroy_output_start
    :end-before: ce_cli_stop_destroy_output_stop


Exactly-once delivery
---------------------

The |az| Data Lake Storage Gen1 connector is able to provide exactly-once semantics to consumers of the objects it exports to
|az| Data Lake Storage Gen1, **if** the connector is supplied with a deterministic partitioner.

Currently, out of the available partitioners, the default and field partitioners are always deterministic. ``TimeBasedPartitioner`` can be deterministic with some configurations as discussed below. This implies that when any of these partitioners is used, file splitting always happens at the same offsets for a given set of |ak| records. These partitioners take into account ``flush.size`` and ``schema.compatibility`` to decide when to roll and save a new file to |az| Data Lake Storage Gen1.
The connector always delivers files in |az| Data Lake Storage Gen1 that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file will be still in the ``temp/`` folder . If, on the other hand, a failure occurs after the upload has completed, but before the corresponding offset is committed to |ak| by the connector, then a re-upload will take place. However, this type of re-upload is transparent to the user of the |az| Data Lake Storage Gen1 folder, who at any time will have access to the same records made eventually available by successful uploads to |az| Data Lake Storage Gen1.

To guarantee exactly-once semantics with the ``TimeBasedPartitioner``, the connector must be configured to use a deterministic implementation of ``TimestampExtractor`` and a deterministic rotation strategy.
The deterministic timestamp extractors are |ak| records (``timestamp.extractor=Record``) or record fields (``timestamp.extractor=RecordField``).
The deterministic rotation strategy configuration is ``rotate.interval.ms`` (setting ``rotate.schedule.interval.ms`` is nondeterministic and will invalidate exactly-once guarantees).

.. image:: ../../images/connect-s3-eos.png

Schema Evolution
----------------

The |az| Data Lake Storage Gen1 connector supports schema evolution and reacts to schema changes of data according to the
``schema.compatibility`` configuration. This section explains how the
connector reacts to schema evolution under different values of ``schema.compatibility``. The
``schema.compatibility`` can be set to ``NONE``, ``BACKWARD``, ``FORWARD`` and ``FULL``, which means
NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively.

* **NO Compatibility**: By default, the ``schema.compatibility`` is set to ``NONE``. In this case,
  the connector ensures that each file written to |az| Data Lake Storage Gen1 has the proper schema. When the connector
  observes a schema change in data, it commits the current set of files for the affected topic
  partitions and writes the data with new schema in new files.

* **BACKWARD Compatibility**: If a schema is evolved in a backward compatible way, the connector can always
  use the latest schema to query all the data uniformly. For example, removing fields is a backward
  compatible change to a schema, since when the connector encounters records written with the old schema that
  contain these fields, the connector can just ignore them. Adding a field with a default value is also backward
  compatible.

  If ``BACKWARD`` is specified in the ``schema.compatibility``, the connector keeps track
  of the latest schema used in writing data to |az| Data Lake Storage Gen1. If a data record with a schema version
  larger than current latest schema arrives, the connector commits the current set of files
  and writes the data record with new schema to new files. For data records arriving at a later time
  that use an earlier schema, the connector projects the data record to the latest schema
  before writing to the same set of files in |az| Data Lake Storage Gen1.

* **FORWARD Compatibility**: If a schema is evolved in a forward compatible way, the connector can always
  use the oldest schema to query all the data uniformly. Removing a field that had a default value
  is forward compatible, since the old schema will use the default value when the field is missing.

  If ``FORWARD`` is specified in the ``schema.compatibility``, the connector projects the data to
  the oldest schema before writing to the same set of files in |az| Data Lake Storage Gen1.

* **Full Compatibility**: Full compatibility means that old data can be read with the new schema
  and new data can also be read with the old schema.

  If ``FULL`` is specified in the ``schema.compatibility``, the connector performs the same action
  as ``BACKWARD``.

Schema evolution in the |az| Data Lake Storage Gen1 connector works the same way as :ref:`s3_schema_evolution`.

----------------------------------------------------------
Write JSON message values into |az| Data Lake Storage Gen1
----------------------------------------------------------

The example settings file is shown below:

.. sourcecode:: bash

    name=datalake-sink
    connector.class=io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector
    tasks.max=1
    topics=datalake_topic
    flush.size=100

    # Required configuration
    azure.datalake.client.id=<your client id>
    azure.datalake.client.key=<your client key>

    # The following define the information used to validate the license stored in Kafka
    confluent.license=
    confluent.topic.bootstrap.servers=localhost:9092

The first few settings are common to most connectors. ``topics`` specifies the topics to export data from, in
this case ``datalake_topic``. The property ``flush.size`` specifies the number of records per partition the connector needs
to write to before completing a multiblock upload to |az| Data Lake Storage Gen1.

The ``azure.datalake.client.id`` and ``azure.datalake.client.key`` are your required |az| credentials.
This is a licensed Confluent connector. Enter the following for testing purposes.
For more on the this look at :ref:`the |az| Data Lake Storage Gen1 Licensing section<connect_azure_data_lake_gen1_storage_license>`.

.. sourcecode:: bash

  azure.datalake.account.name=<your account name>
  azure.datalake.token.endpoint=<your azure oauth2 token endpoint>

The next settings are specific to |az| Data Lake Storage Gen1. A mandatory setting is the name of your |az| Data Lake Gen1 store/account ``azure.datalake.account.name`` to host the exported
|ak| records. Another mandatory configuration setting is  ``azure.datalake.token.endpoint``. The connector authenticates access to your data lake using this URL.

.. sourcecode:: bash

  format.class=io.confluent.connect.azure.storage.format.json.JsonFormat
  partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

These class settings are required to specify the output file format, which is currently
``io.confluent.connect.azure.storage.format.avro.AvroFormat``, ``io.confluent.connect.azure.storage.format.json.JsonFormat`` or ``io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat``, and the partitioner
class

.. sourcecode:: bash

  schema.compatibility=NONE

Finally, schema evolution is disabled in this example by setting ``schema.compatibility`` to ``NONE``, as explained above.

For detailed descriptions for all the available configuration options of the |az| Data Lake Storage Gen1 connector go to :ref:`connect_azure_data_lake_gen1_storage_configuration_options`.

---------------------------------------------------------
Write raw message values into |az| Data Lake Storage Gen1
---------------------------------------------------------

It is possible to use the |az| Data Lake Storage Gen1 connector to write out the unmodified original message values into
newline-separated files in |az| Data Lake Storage Gen1. To accomplish this configure |kconnect-long| so it does not deserialize any of the
messages, and configure the |az| Data Lake Storage Gen1 connector to store the message values in a binary format in |az| Data Lake Storage Gen1.

The first part of the |az| Data Lake Storage Gen1 connector configuration is similar to other examples.

.. sourcecode:: bash

  name=datalake-raw-sink
  connector.class=io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector
  tasks.max=1
  topics=datalake_topic
  flush.size=3

The ``topics`` setting specifies the topics you want to export data from, which is ``datalake_topic`` in the example.
The property ``flush.size`` specifies the number of records per partition the connector needs
to write before completing an upload to |az| Data Lake Storage Gen1.

Next, configure container name, block size, and compression type.

.. sourcecode:: bash

  azure.datalake.account.name=myconfluentdatalake
  azure.datalake.token.endpoint=https://login.microsoftonline.com/a7d99622-a589-4520-8ce3-c280ed1cb00c/oauth2/token
  azure.datalake.client.id=21aaeb79-1956-486a-bc36-baa1f710d567
  azure.datalake.client.key=HGw@4@DSkjBRslXA4vuR:-lxQ4H3+PTs
  az.compression.type=gzip

The next settings are specific to |az| Data Lake Storage Gen1. A mandatory setting is the account name of your Gen1 |az| Data Lake, ``azure.datalake.account.name`` which will host the exported |ak| records. Another mandatory configuration setting is ``azure.datalake.token.endpoint``. The connector authenticates access to your data lake using this URL. The ``azure.datalake.client.id`` and ``azure.datalake.client.key`` are your required |az| client credentials.

The ``az.compression.type`` specifies that the |az| Data Lake Storage Gen1 connector should compress all |az| Data Lake Storage Gen1 files
using GZIP compression, adding the ``.gz`` extension to any files (see below).

This example configuration is typical of most |az| Data Lake Storage Gen1 connectors.
Now, configure the connector to read the raw message values and write them in
binary format:

.. sourcecode:: bash

  value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
  format.class=io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
  schema.compatibility=NONE

The ``value.converter`` setting overrides the connector default in the |kconnect|
worker configuration. ``ByteArrayConverter`` is used to instruct |kconnect| to skip
deserializing the message values and provide the message values in their raw
binary form. The ``format.class`` setting is used to instruct the |az| Data Lake Storage Gen1 connector to write these
binary message values as-is into |az| Data Lake Storage Gen1 files. By default the messages written to the same |az| Data Lake Storage Gen1
file are separated by a newline character sequence, but you can control this with the
``format.bytearray.separator`` setting. You may want to consider setting this if your messages might
contain newlines. Also, by default the files written to |az| Data Lake Storage Gen1 have an
extension of ``.bin`` (before compression, if enabled), or you can use the
``format.bytearray.extension`` setting to change the pre-compression filename extension.

Next, you need to decide how you want to partition the consumed messages in |az| Data Lake Storage Gen1 files. You have a few
options, including the default partitioner that preserves the same partitions as in |ak|:

.. sourcecode:: bash

  partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

Or, you could partition using the timestamp of the |ak| messages.

.. sourcecode:: bash

  partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
  timestamp.extractor=Record

Or, you can use the timestamp that the |az| Data Lake Storage Gen1 connector processes each message.

.. sourcecode:: bash

  partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
  timestamp.extractor=Wallclock

Custom partitioners are always an option, too. Just be aware that since the record value is
an opaque binary value, |kconnect| cannot extract timestamps from fields using the ``RecordField``
option.

The |az| Data Lake Storage Gen1 connector configuration outlined above results in newline-delimited gzipped objects in |az| Data Lake Storage Gen1
with ``.bin.gz``.

Additional Documentation
------------------------


.. toctree::
   :maxdepth: 1

   configuration_options