.. _connect_azure_data_lake_gen1_storage: |kconnect-long| |az| Data Lake Storage Gen1 Sink Connector ========================================================== You can use the Azure Data Lake Storage Gen1 connector, currently available as a sink, to export data from |ak-tm| topics to Azure Data Lake Storage Gen1 files in either Avro or JSON formats. Depending on your environment, the Azure Data Lake Storage Gen1 connector can export data by guaranteeing exactly-once delivery semantics to consumers of the Azure Data Lake Storage Gen1 files it produces. The Azure Data Lake Storage Gen1 sink connector periodically polls data from |ak| and, in turn, uploads it to Azure Data Lake Storage Gen1. A partitioner is used to split the data of every |ak| partition into chunks. Each chunk of data is represented as an Azure Data Lake Storage Gen1 file. The key name encodes the topic, the |ak| partition, and the start offset of this data chunk. If no partitioner is specified in the configuration, the default partitioner which preserves |ak| partitioning is used. The size of each data chunk is determined by the number of records written to Azure Data Lake Storage Gen1 and by schema compatibility. Features -------- The |az-long| Data Lake Storage Gen1 connector offers a variety of features: * **Exactly Once Delivery**: Records that are exported using a deterministic partitioner are delivered with exactly-once semantics. * **Pluggable Data Format with or without Schema**: Out of the box, the connector supports writing data to |az| Data Lake Storage Gen1 in Avro and JSON format. Besides records with schema, the connector supports exporting plain JSON and as byte array records without schema in text files, one record per-line. In general, the connector may accept any format that provides an implementation of the ``Format`` interface. * **Schema Evolution**: When schemas are used, the connector supports schema evolution based on schema compatibility modes. The available modes are: ``NONE``, ``BACKWARD``, ``FORWARD`` and ``FULL`` and a selection can be made by setting the property ``schema.compatibility`` in the connector's configuration. When the connector observes a schema change, it decides whether to roll the file or project the record to the proper schema according to the ``schema.compatibility`` configuration in use. * **Pluggable Partitioner**: The connector comes out of the box with partitioners that support default partitioning based on |ak-tm| partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the ``Partitioner`` class. Additionally, you can customize time-based partitioning by extending the ``TimeBasedPartitioner`` class. .. _connect_azure_data_lake_gen1_storage_install: Install |az| Data Lake Storage Gen1 Connector --------------------------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen1-storage:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen1-storage:1.1.1 -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file <https://www.confluent.io/connector/kafka-connect-s3-source/#download>`_ for your connector and then follow the manual connector installation :ref:`instructions <connect_install_connectors>`. .. _connect_azure_data_lake_gen1_storage_license: License ------- .. include:: ../includes/enterprise-license.rst See :ref:`azure-data-lake-connector-license-config` for license properties and :ref:`connect_azure_data_lake_gen1_storage_license-topic-configuration` for information about the license topic. Quick Start ----------- In this quick start, the |az| Data Lake Storage Gen1 connector is used to export data produced by the Avro console producer to |az| Data Lake Storage Gen1. .. note:: **Before you begin:** create an |az| Data Lake Storage Gen1 account and grant **write access** to the user completing these procedures. See `Get started with Azure Data Lake Storage Gen1 using the Azure portal <https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-get-started-portal>`_ for additional information. Also see `Service-to-service authentication with Azure Data Lake Storage Gen1 using Azure Active Directory <https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory>`_ for information on setting up the account needed for the |az| Data Lake Storage Gen1 connector. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen1-storage:latest .. tip:: By default, it will install the plugin into ``share/confluent-hub-components`` and add the directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. Also see `Azure Data Lake Gen1 CLI <https://docs.microsoft.com/en-gb/azure/data-lake-store/data-lake-store-get-started-cli-2.0>`_ for setuping up and using the |az| CLI. Start the services using the Confluent CLI. .. codewithvars:: bash |confluent_start| Every service start in order, printing a message with its status. .. include:: ../../includes/cli.rst :start-after: CE_CLI_startup_output :end-before: COS_CP_CLI_startup_output .. note:: Make sure the |az| Data Lake Storage Gen1 connector has write access to the |az| Data Lake Storage Gen1 account shown in ``azure.datalake.account.name`` and can deploy credentials successfully. To import a few records with a simple schema in |ak|, start the Avro console producer as follows: .. sourcecode:: bash ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic datalake_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' Then, in the console producer, enter the following: .. sourcecode:: bash {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"} The nine records entered are published to the |ak| topic ``datalake_topic`` in Avro format. Create a ``datalake.properties`` file with the following contents: .. codewithvars:: properties name=datalake-sink connector.class=io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector tasks.max=1 topics=datalake_topic flush.size=3 azure.datalake.client.id=<your client id> azure.datalake.client.key=<your client key> azure.datalake.account.name=<your account name> azure.datalake.token.endpoint=<your azure oauth2 token endpoint> format.class=io.confluent.connect.azure.storage.format.avro.AvroFormat confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 Before starting the connector, make sure that the configurations in ``datalake.properties`` are properly set to your configurations of |az| Data Lake Storage Gen1. For this example, make sure that ``azure.datalake.account.name`` points to your Data Lake store, ``azure.datalake.client.id`` is set to your user id, and ``azure.datalake.client.key`` is set to your user's secret key. The user ID or client ID should have permission to write to the Azure Data Lake Storage Gen1 Account. Finally, set ``azure.datalake.token.endpoint`` to the Oauth 2 endpoint as described `here <https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory#step-4-get-the-oauth-20-token-endpoint-only-for-java-based-applications>`_, and use the v1 token endpoint. Then start the |az| Blob Storage connector by loading its configuration with the following command. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| datalake-sink|dash| -d datalake.properties { "name": "datalake-sink", "config": { "name":"datalake-sink", "connector.class":"io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector", "tasks.max":"1", "topics":"datalake_topic", "flush.size":"3", "azure.datalake.client.id":"<your client id>", "azure.datalake.client.key":"<your client key>", "azure.datalake.account.name":"<your account name>", "azure.datalake.token.endpoint":"<your azure oauth2 token endpoint>", "format.class":"io.confluent.connect.azure.storage.format.avro.AvroFormat", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor":"1" }, "tasks": [] } Check that the connector started successfully. Review the |kconnect| worker's log by entering the following: .. codewithvars:: bash |confluent_log| connect Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from |ak| to |az| Data Lake Storage Gen1. Once the connector has ingested some records, check that the data is available in |az| Data Lake Storage Gen1. Use the following |az| CLI command: .. sourcecode:: bash az dls fs list --account <your account name> --path /topics Once you navigate into the subfolders, you should see three objects with keys. .. sourcecode:: bash topics/datalake_topic/partition=0/datalake_topic+0+0000000000.avro topics/datalake_topic/partition=0/datalake_topic+0+0000000003.avro topics/datalake_topic/partition=0/datalake_topic+0+0000000006.avro Each file is encoded as ``<topic>+<kafkaPartition>+<startOffset>.<format>``. To verify the contents, copy each file from |az| Data Lake Storage Gen1 to your local filesystem. Use the following |az| CLI command changing the destination to what makes sense for you: .. sourcecode:: bash az dls fs download --account <your account name> --source-path /topics/datalake_topic/partition=0/datalake_topic+0+0000000000.avro --destination-path "C:\connect\datalake_topic+0+0000000000.avro" Use ``avro-tools-1.9.0.jar`` (available in `Apache mirrors <http://mirror.metrocast.net/apache/avro/avro-1.9.0/java/avro-tools-1.9.0.jar>`_) to print the records. .. sourcecode:: bash java -jar avro-tools-1.8.2.jar tojson datalake_topic+0+0000000000.avro For the file above, you should see the following output: .. sourcecode:: bash {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} The rest of the records are contained in the other two files. Finally, stop the |kconnect| worker and all other Confluent services by running: .. codewithvars:: bash confluent local stop Your output should resemble: .. include:: ../../includes/cli.rst :start-after: ce_cli_stop_output_start :end-before: ce_cli_stop_output_stop You can stop all services and remove any data generated during this quick start by entering the following command: .. codewithvars:: bash |confluent_destroy| Your output should resemble: .. include:: ../../includes/cli.rst :start-after: ce_cli_stop_destroy_output_start :end-before: ce_cli_stop_destroy_output_stop Exactly-once delivery --------------------- The |az| Data Lake Storage Gen1 connector is able to provide exactly-once semantics to consumers of the objects it exports to |az| Data Lake Storage Gen1, **if** the connector is supplied with a deterministic partitioner. Currently, out of the available partitioners, the default and field partitioners are always deterministic. ``TimeBasedPartitioner`` can be deterministic with some configurations as discussed below. This implies that when any of these partitioners is used, file splitting always happens at the same offsets for a given set of |ak| records. These partitioners take into account ``flush.size`` and ``schema.compatibility`` to decide when to roll and save a new file to |az| Data Lake Storage Gen1. The connector always delivers files in |az| Data Lake Storage Gen1 that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file will be still in the ``temp/`` folder . If, on the other hand, a failure occurs after the upload has completed, but before the corresponding offset is committed to |ak| by the connector, then a re-upload will take place. However, this type of re-upload is transparent to the user of the |az| Data Lake Storage Gen1 folder, who at any time will have access to the same records made eventually available by successful uploads to |az| Data Lake Storage Gen1. To guarantee exactly-once semantics with the ``TimeBasedPartitioner``, the connector must be configured to use a deterministic implementation of ``TimestampExtractor`` and a deterministic rotation strategy. The deterministic timestamp extractors are |ak| records (``timestamp.extractor=Record``) or record fields (``timestamp.extractor=RecordField``). The deterministic rotation strategy configuration is ``rotate.interval.ms`` (setting ``rotate.schedule.interval.ms`` is nondeterministic and will invalidate exactly-once guarantees). .. image:: ../../images/connect-s3-eos.png Schema Evolution ---------------- The |az| Data Lake Storage Gen1 connector supports schema evolution and reacts to schema changes of data according to the ``schema.compatibility`` configuration. This section explains how the connector reacts to schema evolution under different values of ``schema.compatibility``. The ``schema.compatibility`` can be set to ``NONE``, ``BACKWARD``, ``FORWARD`` and ``FULL``, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively. * **NO Compatibility**: By default, the ``schema.compatibility`` is set to ``NONE``. In this case, the connector ensures that each file written to |az| Data Lake Storage Gen1 has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files. * **BACKWARD Compatibility**: If a schema is evolved in a backward compatible way, the connector can always use the latest schema to query all the data uniformly. For example, removing fields is a backward compatible change to a schema, since when the connector encounters records written with the old schema that contain these fields, the connector can just ignore them. Adding a field with a default value is also backward compatible. If ``BACKWARD`` is specified in the ``schema.compatibility``, the connector keeps track of the latest schema used in writing data to |az| Data Lake Storage Gen1. If a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time that use an earlier schema, the connector projects the data record to the latest schema before writing to the same set of files in |az| Data Lake Storage Gen1. * **FORWARD Compatibility**: If a schema is evolved in a forward compatible way, the connector can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing. If ``FORWARD`` is specified in the ``schema.compatibility``, the connector projects the data to the oldest schema before writing to the same set of files in |az| Data Lake Storage Gen1. * **Full Compatibility**: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema. If ``FULL`` is specified in the ``schema.compatibility``, the connector performs the same action as ``BACKWARD``. Schema evolution in the |az| Data Lake Storage Gen1 connector works the same way as :ref:`s3_schema_evolution`. ---------------------------------------------------------- Write JSON message values into |az| Data Lake Storage Gen1 ---------------------------------------------------------- The example settings file is shown below: .. sourcecode:: bash name=datalake-sink connector.class=io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector tasks.max=1 topics=datalake_topic flush.size=100 # Required configuration azure.datalake.client.id=<your client id> azure.datalake.client.key=<your client key> # The following define the information used to validate the license stored in Kafka confluent.license= confluent.topic.bootstrap.servers=localhost:9092 The first few settings are common to most connectors. ``topics`` specifies the topics to export data from, in this case ``datalake_topic``. The property ``flush.size`` specifies the number of records per partition the connector needs to write to before completing a multiblock upload to |az| Data Lake Storage Gen1. The ``azure.datalake.client.id`` and ``azure.datalake.client.key`` are your required |az| credentials. This is a licensed Confluent connector. Enter the following for testing purposes. For more on the this look at :ref:`the |az| Data Lake Storage Gen1 Licensing section<connect_azure_data_lake_gen1_storage_license>`. .. sourcecode:: bash azure.datalake.account.name=<your account name> azure.datalake.token.endpoint=<your azure oauth2 token endpoint> The next settings are specific to |az| Data Lake Storage Gen1. A mandatory setting is the name of your |az| Data Lake Gen1 store/account ``azure.datalake.account.name`` to host the exported |ak| records. Another mandatory configuration setting is ``azure.datalake.token.endpoint``. The connector authenticates access to your data lake using this URL. .. sourcecode:: bash format.class=io.confluent.connect.azure.storage.format.json.JsonFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner These class settings are required to specify the output file format, which is currently ``io.confluent.connect.azure.storage.format.avro.AvroFormat``, ``io.confluent.connect.azure.storage.format.json.JsonFormat`` or ``io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat``, and the partitioner class .. sourcecode:: bash schema.compatibility=NONE Finally, schema evolution is disabled in this example by setting ``schema.compatibility`` to ``NONE``, as explained above. For detailed descriptions for all the available configuration options of the |az| Data Lake Storage Gen1 connector go to :ref:`connect_azure_data_lake_gen1_storage_configuration_options`. --------------------------------------------------------- Write raw message values into |az| Data Lake Storage Gen1 --------------------------------------------------------- It is possible to use the |az| Data Lake Storage Gen1 connector to write out the unmodified original message values into newline-separated files in |az| Data Lake Storage Gen1. To accomplish this configure |kconnect-long| so it does not deserialize any of the messages, and configure the |az| Data Lake Storage Gen1 connector to store the message values in a binary format in |az| Data Lake Storage Gen1. The first part of the |az| Data Lake Storage Gen1 connector configuration is similar to other examples. .. sourcecode:: bash name=datalake-raw-sink connector.class=io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector tasks.max=1 topics=datalake_topic flush.size=3 The ``topics`` setting specifies the topics you want to export data from, which is ``datalake_topic`` in the example. The property ``flush.size`` specifies the number of records per partition the connector needs to write before completing an upload to |az| Data Lake Storage Gen1. Next, configure container name, block size, and compression type. .. sourcecode:: bash azure.datalake.account.name=myconfluentdatalake azure.datalake.token.endpoint=https://login.microsoftonline.com/a7d99622-a589-4520-8ce3-c280ed1cb00c/oauth2/token azure.datalake.client.id=21aaeb79-1956-486a-bc36-baa1f710d567 azure.datalake.client.key=HGw@4@DSkjBRslXA4vuR:-lxQ4H3+PTs az.compression.type=gzip The next settings are specific to |az| Data Lake Storage Gen1. A mandatory setting is the account name of your Gen1 |az| Data Lake, ``azure.datalake.account.name`` which will host the exported |ak| records. Another mandatory configuration setting is ``azure.datalake.token.endpoint``. The connector authenticates access to your data lake using this URL. The ``azure.datalake.client.id`` and ``azure.datalake.client.key`` are your required |az| client credentials. The ``az.compression.type`` specifies that the |az| Data Lake Storage Gen1 connector should compress all |az| Data Lake Storage Gen1 files using GZIP compression, adding the ``.gz`` extension to any files (see below). This example configuration is typical of most |az| Data Lake Storage Gen1 connectors. Now, configure the connector to read the raw message values and write them in binary format: .. sourcecode:: bash value.converter=org.apache.kafka.connect.converters.ByteArrayConverter format.class=io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat schema.compatibility=NONE The ``value.converter`` setting overrides the connector default in the |kconnect| worker configuration. ``ByteArrayConverter`` is used to instruct |kconnect| to skip deserializing the message values and provide the message values in their raw binary form. The ``format.class`` setting is used to instruct the |az| Data Lake Storage Gen1 connector to write these binary message values as-is into |az| Data Lake Storage Gen1 files. By default the messages written to the same |az| Data Lake Storage Gen1 file are separated by a newline character sequence, but you can control this with the ``format.bytearray.separator`` setting. You may want to consider setting this if your messages might contain newlines. Also, by default the files written to |az| Data Lake Storage Gen1 have an extension of ``.bin`` (before compression, if enabled), or you can use the ``format.bytearray.extension`` setting to change the pre-compression filename extension. Next, you need to decide how you want to partition the consumed messages in |az| Data Lake Storage Gen1 files. You have a few options, including the default partitioner that preserves the same partitions as in |ak|: .. sourcecode:: bash partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner Or, you could partition using the timestamp of the |ak| messages. .. sourcecode:: bash partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Record Or, you can use the timestamp that the |az| Data Lake Storage Gen1 connector processes each message. .. sourcecode:: bash partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Wallclock Custom partitioners are always an option, too. Just be aware that since the record value is an opaque binary value, |kconnect| cannot extract timestamps from fields using the ``RecordField`` option. The |az| Data Lake Storage Gen1 connector configuration outlined above results in newline-delimited gzipped objects in |az| Data Lake Storage Gen1 with ``.bin.gz``. Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options