.. _hdfs3_connector:

|kconnect-long| HDFS 3 Sink Connector
=====================================

The HDFS 3 connector allows you to export data from |ak| topics to HDFS 3.x files in a variety of formats
and integrates with Hive to make data immediately available for querying with HiveQL.

.. note::

    This connector is released separately from the HDFS 2.x connector.
    If you are targeting an HDFS 2.x distribution, see the :ref:`connect_hdfs` documentation for more details.
    If you are upgrading from the :ref:`connect_hdfs`, update ``connector.class`` to
    ``io.confluent.connect.hdfs3.Hdfs3SinkConnector`` and ``partitioner.class`` to
    ``io.confluent.connect.storage.partitioner.*`` All HDFS 2.x configurations are applicable in
    this connector.

The connector periodically polls data from |ak-tm| and writes them to HDFS. The data from each |ak|
topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is
represented as an HDFS file with topic, |ak| partition, start and end offsets of this data chunk
in the file name. If a partitioner is not specified in the configuration, the default partitioner which
preserves the |ak| partitioning is used. The size of each data chunk is determined by the number of
records written to HDFS, the time written to HDFS, and schema compatibility.

The HDFS connector integrates with Hive and when it is enabled, the connector automatically creates
an external Hive partitioned table for each |ak| topic and updates the table according to the
available data in HDFS.

Prerequisites
-------------

The following are required to run the |kconnect-long| HDFS 3 Sink Connector:

* |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above
* |kconnect|: |cp| 3.3.0 or above, or |ak| 0.11.0 or above
* Java 1.8
* HDFS 3.x cluster
* Hive 3.x

This connector ships with HDFS 3.x client and Hive 3.x libraries, which are not compatible with HDFS 2.x or Hive 2.x clusters.

.. LICENSE: Not sure how the license is implemented for this connector. Ask Randall.

Install HDFS 3 Sink Connector
------------------------------

.. include:: ../../includes/connector-install.rst

.. include:: ../../includes/connector-install-hub.rst

.. codewithvars:: bash

    confluent-hub install confluentinc/kafka-connect-hdfs3:latest

.. include:: ../../includes/connector-install-version.rst

.. codewithvars:: bash

   confluent-hub install confluentinc/kafka-connect-hdfs3:1.0.0-preview

---------------------------
Install Connector Manually
---------------------------

`Download and extract the ZIP file <https://www.confluent.io/connector/kafka-connect-hdfs3/#download>`_ for your
connector and then follow the manual connector inst

Quick Start
-----------

This quick start uses the HDFS connector to export data produced by
the Avro console producer to HDFS.

Before you start |cp|, make sure Hadoop is
running locally or remotely and that you know the HDFS URL. For Hive integration, you
need to have Hive installed and to know the metastore thrift URI.

This quick start assumes that you started the required services with the default configurations and
you should make necessary changes according to the actual configurations used.

.. note:: You need to make sure the connector user has write access to the directories
   specified in ``topics.dir`` and ``logs.dir``. The default value of ``topics.dir`` is
   ``/topics`` and the default value of ``logs.dir`` is ``/logs``. If you don't specify the two
   configurations, make sure that the connector user has write access to ``/topics`` and ``/logs``.
   You may need to create ``/topics`` and ``/logs`` before running the connector, as the connector
   likely does not have write access to ``/``.

This quick start assumes that security is not configured for HDFS and Hive metastore. To make the necessary security
configurations, see the :ref:`Secure HDFS and Hive Metastore <hdfs3_connector>` section.

#.  Install the connector through the :ref:`confluent_hub_client`.

    .. codewithvars:: bash

       # run from your Confluent Platform installation directory
       confluent-hub install confluentinc/kafka-connect-hdfs3:latest

#.  Start |cp|.

    .. codewithvars:: bash

       |confluent_start|

#.  `Produce <https://docs.confluent.io/current/cli/command-reference/confluent-produce.html>`_ test Avro data to the ``test_hdfs`` topic in |ak|.

    .. codewithvars:: bash

       ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs \
       --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

        # paste each of these messages

        {"f1": "value1"}
        {"f1": "value2"}
        {"f1": "value3"}

#.  Create a ``hdfs3-sink.json`` file with the following contents:

    .. codewithvars:: json

        {
          "name": "hdfs3-sink",
          "config": {
            "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
            "tasks.max": "1",
            "topics": "test_hdfs",
            "hdfs.url": "hdfs://localhost:9000",
            "flush.size": "3",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url":"http://localhost:8081",
            "confluent.topic.bootstrap.servers": "localhost:9092",
            "confluent.topic.replication.factor": "1"
          }
        }

    .. note::

        The first few settings are common settings you'll specify for all connectors. The ``topics``
        parameter specifies the topics to export data from; in this case ``test_hdfs``. ``hdfs.url``
        specifies the HDFS having data written to it. You should set this according to your configuration.
        ``flush.size`` specifies the number of records the connector needs to write before invoking file
        commits. For high availability HDFS deployments, set ``hadoop.conf.dir`` to a directory that includes
        ``hdfs-site.xml``. After ``hdfs-site.xml`` is in place and ``hadoop.conf.dir``
        has been set, ``hdfs.url`` may be set to the namenodes nameservice id, such as ``nameservice1``.

#.  Load the HDFS3 Sink Connector.

    .. include:: ../../../includes/confluent-local-consume-limit.rst

    .. codewithvars:: bash

       |confluent_load| hdfs3-sink|dash| -d hdfs3-sink.json

#.  Confirm that the connector is in a ``RUNNING`` state.

    .. codewithvars:: bash

        |confluent_status| hdfs3-sink

#.  Validate that the Avro data is in HDFS.

    .. codewithvars:: bash

       # list files in partition 0
       hadoop fs -ls /topics/test_hdfs/partition=0

       # the following should appear in the list
       # /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro

    .. note:: The file name is encoded as ``topic+kafkaPartition+startOffset+endOffset.format``.


#.  Extract the contents of the file using
    the `avro-tools-1.8.2.jar <https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.8.2/avro-tools-1.8.2.jar>`_.

    .. sourcecode:: bash

       # substitute "<namenode>" for the HDFS name node hostname
       hadoop jar avro-tools-1.8.2.jar tojson \
       hdfs://<namenode>/topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro

#.  If you experience issues with the previous step, first copy the Avro file
    from HDFS to the local filesystem and try again with java.

    .. sourcecode:: bash

       hadoop fs -copyToLocal /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro \
       /tmp/test_hdfs+0+0000000000+0000000002.avro

       java -jar avro-tools-1.8.2.jar tojson /tmp/test_hdfs+0+0000000000+0000000002.avro

       # expected output
       {"f1":"value1"}
       {"f1":"value2"}
       {"f1":"value3"}


.. note::

   If you want to run the quick start with Hive integration, add the following configurations to ``hdfs-sink.json``:
   
   .. highlight:: none
      
   ::

      "hive.integration": "true",
      "hive.metastore.uris": "<thrift uri to your Hive metastore>"
      "schema.compatibility": "BACKWARD"

   After the connector finishes ingesting data to HDFS, you can use Hive to check the data:
   
   ::
      
      beeline -e "SELECT * FROM test_hdfs;"

.. note:: If the ``hive.metastore.uris`` setting is empty, an embedded Hive
          metastore is created in the directory the connector is started in.
          Start Hive in that specific directory to query the data.

Features
--------

The HDFS 3 connector offers a variety of features:

- :ref:`hdfs3_exactly_once_delivery`
- :ref:`hdfs3_data_formats`
- :ref:`hdfs3_partitioner_strategies`
- :ref:`hdfs3_hive_integration`
- :ref:`hdfs3_schema_evolution`
- :ref:`hdfs3_secure_hdfs_hive_metastore`

.. _hdfs3_exactly_once_delivery:

---------------------
Exactly Once Delivery
---------------------

The connector uses a write-ahead log to ensure each record is written to HDFS exactly once.
Also, the connector manages offsets by encoding the |ak| offset information into the HDFS
file so that it can start from the last committed offsets in case of failures and task restarts.

.. _hdfs3_data_formats:

-----------------------
Extensible Data Formats
-----------------------

Out of the box, the connector supports writing data to HDFS in Avro and Parquet format.
However, you can write other formats to HDFS by extending the ``Format`` class.

You must configure the ``format.class`` and ``partitioner.class`` if you want to write other
formats to HDFS or use other partitioners. The following example configurations show how to
write Parquet format and use the field partitioner:

.. sourcecode:: properties

    format.class=io.confluent.connect.hdfs3.parquet.ParquetFormat
    partitioner.class=io.confluent.connect.storage.partitioner.FieldPartitioner

.. note::

    When using the field partitioner, you must specify the ``partition.field.name``
    configuration to specify the field name of the record that
    is used for partitioning.
    
The following example shows how to use Parquet format and the field partitioner.

#.  :ref:`Produce <confluent_local_produce>` test Avro data to the ``parquet_field_hdfs`` topic in |ak|.

    .. codewithvars:: bash

       ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic parquet_field_hdfs \
       --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"address","type":"string"}, {"name" : "age", "type" : "int"}, {"name" : "is_customer", "type" : "boolean"}]}'


        # paste each of these messages

        {"name":"Peter", "address":"Mountain View", "age":27, "is_customer":true}
        {"name":"David", "address":"Mountain View", "age":37, "is_customer":false}
        {"name":"Kat", "address":"Palo Alto", "age":30, "is_customer":true}
        {"name":"David", "address":"San Francisco", "age":35, "is_customer":false}
        {"name":"Leslie", "address":"San Jose", "age":26, "is_customer":true}
        {"name":"Dani", "address":"Seatle", "age":32, "is_customer":false}
        {"name":"Kim", "address":"San Jose", "age":30, "is_customer":true}
        {"name":"Steph", "address":"Seatle", "age":31, "is_customer":false}

#.  Create a ``hdfs3-parquet-field.json`` file with the following contents:

    .. codewithvars:: json

        {
            "name": "hdfs3-parquet-field",
            "config": {
                "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
                "tasks.max": "1",
                "topics": "parquet_field_hdfs",
                "hdfs.url": "hdfs://localhost:9000",
                "flush.size": "3",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "value.converter": "io.confluent.connect.avro.AvroConverter",
                "value.converter.schema.registry.url":"http://localhost:8081",
                "confluent.topic.bootstrap.servers": "localhost:9092",
                "confluent.topic.replication.factor": "1",
                
                "format.class":"io.confluent.connect.hdfs3.parquet.ParquetFormat",
                "partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner",
                "partition.field.name":"is_customer"
            }
        }
        
#.  Load the HDFS3 Sink Connector.

    .. codewithvars:: bash

       |confluent_load| hdfs3-parquet-field|dash| -d hdfs3-parquet-field.json

#.  Confirm that the connector is in a ``RUNNING`` state.

    .. codewithvars:: bash

        |confluent_status| hdfs3-parquet-field

#.  Validate that the Parquet data is in HDFS.

    .. codewithvars:: bash

       # list files in partition called is_customer=true
       hadoop fs -ls /topics/parquet_field_hdfs/is_customer=true

       # the following should appear in the list
       # /topics/parquet_field_hdfs/is_customer=true/parquet_field_hdfs+0+0000000000+0000000002.parquet
       # /topics/parquet_field_hdfs/is_customer=true/parquet_field_hdfs+0+0000000004+0000000004.parquet


#.  Extract the contents of the file using
    the `parquet-tools-1.9.0.jar <https://repo1.maven.org/maven2/org/apache/parquet/parquet-tools/1.9.0/parquet-tools-1.9.0.jar>`_.

    .. sourcecode:: bash

       # substitute "<namenode>" for the HDFS name node hostname
       hadoop jar parquet-tools-1.9.0.jar cat --json / hdfs://<namenode>/topics/parquet_field_hdfs/is_customer=true/parquet_field_hdfs+0+0000000000+0000000002.parquet
       
       
#.  If you experience issues with the previous step, first copy the Parquet file
    from HDFS to the local filesystem and try again with java.

    .. sourcecode:: bash

       hadoop fs -copyToLocal /topics/parquet_field_hdfs/is_customer=true/parquet_field_hdfs+0+0000000000+0000000002.parquet / /tmp/parquet_field_hdfs+0+0000000000+0000000002.parquet

       java -jar parquet-tools-1.9.0.jar cat --json /tmp/parquet_field_hdfs+0+0000000000+0000000002.parquet

       # expected output
       {"name":"Peter","address":"Mountain View","age":27,"is_customer":true}
       {"name":"Kat","address":"Palo Alto","age":30,"is_customer":true}


.. _hdfs3_partitioner_strategies:

---------------------------------
Extensible Partitioner Strategies
---------------------------------

The connector supports a variety of partitions but you can also implement your own
partitioner by extending the ``io.confluent.connect.storage.partitioner.Partitioner`` class.
You can also customize existing partitioners such as the time-based partitioner by extending
the ``io.confluent.connect.storage.partitioner.TimeBasedPartitioner`` class.

The following partitioners are available by default:

* ``DefaultPartitioner``: The default partition reuses the |ak| record's partition when encoding the partition.
* ``TimeBasedPartitioner``: The time-based partitioners allow for partitions to be created based on a set time interval. The ``HourlyPartitioner`` and ``DailyPartitioner`` preconfigure the intervals, but this partitioner allows full control over the partition duration.
* ``HourlyPartitioner``: A subclass of the ``TimeBasedPartitioner`` that creates partitions on an hourly basis.
* ``DailyPartitioner``: A subclass of the ``TimeBasedPartitioner`` that creates partitions on a daily basis.
* ``FieldPartitioner``: A partitioner that uses record values of the configured ``partition.field.name`` to determine partitions.

.. _hdfs3_hive_integration:

----------------
Hive Integration
----------------

The connector supports Hive integration out of the box.
When enabled, the connector automatically creates a Hive external partitioned table for each topic exported to HDFS.

At a minimum, you need to specify ``hive.integration``, ``hive.metastore.uris`` and ``schema.compatibility`` when integrating Hive.

Here is an example configuration:

.. sourcecode:: properties

    hive.integration=true
    hive.metastore.uris=thrift://localhost:9083 # FQDN for the host part
    schema.compatibility=BACKWARD

You should adjust ``hive.metastore.uris`` according to your Hive configurations.

As connector tasks are long running, the connections to the Hive metastore are kept open until tasks are stopped.
In the default Hive configuration, reconnecting to the Hive metastore creates a new connection.
When the number of tasks is large, it is possible that the retries can cause the number of open connections
to exceed the max allowed connections in the operating system. For this reason, you should set
``hcatalog.hive.client.cache.disabled`` to ``true`` in ``hive.xml``.

.. important::

    If you don't specify the ``hive.metastore.uris``, the connector uses a local metastore
    with Derby in the directory running the connector. You need to run Hive in this directory
    in order to see the Hive metadata change.

To support schema evolution, set ``schema.compatibility`` to be ``BACKWARD``, ``FORWARD`` or
``FULL``. This ensures that Hive can query the data written to HDFS with different schemas using the
latest Hive table schema.

.. _hdfs3_schema_evolution:

----------------
Schema Evolution
----------------

The connector supports schema evolution and varying schema compatibility levels.
When the connector observes a schema change, it projects to the proper schema according
to the ``schema.compatibility`` configuration.

If Hive integration is enabled, you must specify the ``schema.compatibility`` to be ``BACKWARD``,
``FORWARD`` or ``FULL``. This ensures that the Hive table schema is able to query all the data under
a topic written with different schemas. If the ``schema.compatibility`` is set to ``BACKWARD`` or
``FULL``, the Hive table schema for a topic is equivalent to the latest schema in the HDFS files
under that topic that can query the whole data of that topic. If the ``schema.compatibility`` is
set to ``FORWARD``, the Hive table schema of a topic is equivalent to the oldest schema of the HFDS
files under that topic that can query the whole data of that topic.

The following are descriptions of each compatibility type:

* **NO Compatibility**: By default, the ``schema.compatibility`` is set to ``NONE``. In this case,
  the connector ensures that each file written to HDFS has the proper schema. When the connector
  observes a schema change in data, it commits the current set of files for the affected topic
  partitions and writes the data with new schema in new files.

* **BACKWARD Compatibility**: If schema evolution is backward compatible, the connector can always
  use the latest schema to query all the data uniformly. For example, removing fields is a backward
  compatible change to a schema. When the connector encounters records written with the old schema (that
  contain the removed fields), it ignores them. Adding a field with a default value is also backward
  compatible.

  If ``BACKWARD`` is specified in the ``schema.compatibility``, the connector keeps track
  of the latest schema used in writing data to HDFS, and if a data record with a schema version
  larger than current latest schema arrives, the connector commits the current set of files
  and writes the data record with new schema to new files. For data records arriving at a later time
  with schema of an earlier version, the connector projects the data record to the latest schema
  before writing to the same set of files in HDFS.

* **FORWARD Compatibility**: If schema evolution is forward compatible, the connector
  uses the oldest schema to query all the data uniformly. Removing a field that had a default value
  is forward compatible, since the old schema uses the default value when the field is missing.

  If ``FORWARD`` is specified in the ``schema.compatibility`` parameter, the connector projects the data to
  the oldest schema before writing to the same set of files in HDFS.

* **Full Compatibility**: Full compatibility means that old data can be read with the new schema
  and new data can also be read with the old schema.

  If ``FULL`` is specified in the ``schema.compatibility``, the connector performs the same action
  as ``BACKWARD``.

.. _hdfs3_secure_hdfs_hive_metastore:

------------------------------
Secure HDFS and Hive Metastore
------------------------------

The connector supports Kerberos authentication to support both secure HDFS and Hive metastore.

To work with secure HDFS and Hive metastore, you need to specify ``hdfs.authentication.kerberos``,
``connect.hdfs.principal``, ``connect.keytab``, ``hdfs.namenode.principal``:

.. sourcecode:: bash

  hdfs.authentication.kerberos=true
  # Replace _HOST with the actual FQDN of the Namenode host
  connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
  connect.hdfs.keytab=path to the connector keytab
  hdfs.namenode.principal=namenode principal

You need to create the |ak| connect principals and keytab files via Kerberos and distribute the
keytab file to all hosts running the connector. Make sure that only the connector user
has read access to the keytab file. Currently, the connector requires that the principal and
the keytab path to be the same on all the hosts running the connector.

.. note::

    When security is enabled, you need to use FQDN from the host portion of ``hdfs.url`` and ``hive.metastore.uris``.


.. toctree::
   :maxdepth: 1
   :hidden:

   configuration_options
   changelog