Kafka Connect HDFS 3 Sink Connector

The HDFS 3 connector allows you to export data from Kafka topics to HDFS 3.x files in a variety of formats and integrates with Hive to make data immediately available for querying with HiveQL.


This connector is released separately from the HDFS 2.x connector. If you are targeting an HDFS 2.x distribution, see the Kafka Connect HDFS 2 Sink Connector documentation for more details. If you are upgrading from the Kafka Connect HDFS 2 Sink Connector, update connector.class to io.confluent.connect.hdfs3.Hdfs3SinkConnector. All HDFS 2.x configurations are applicable in this connector.

The connector periodically polls data from Apache Kafka® and writes them to HDFS. The data from each Kafka topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is represented as an HDFS file with topic, Kafka partition, start and end offsets of this data chunk in the file name. If a partitioner is not specified in the configuration, the default partitioner which preserves the Kafka partitioning is used. The size of each data chunk is determined by the number of records written to HDFS, the time written to HDFS, and schema compatibility.

The HDFS connector integrates with Hive and when it is enabled, the connector automatically creates an external Hive partitioned table for each Kafka topic and updates the table according to the available data in HDFS.


The following are required to run the Kafka Connect HDFS 3 Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Java 1.8
  • HDFS 3.x cluster

Install HDFS 3 Sink Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-hdfs3:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-hdfs3:1.0.0-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector inst

Quick Start

This quick start uses the HDFS connector to export data produced by the Avro console producer to HDFS.

Before you start Confluent Platform, make sure Hadoop is running locally or remotely and that you know the HDFS URL. For Hive integration, you need to have Hive installed and to know the metastore thrift URI.

This quick start assumes that you started the required services with the default configurations and you should make necessary changes according to the actual configurations used.


You need to make sure the connector user has write access to the directories specified in topics.dir and logs.dir. The default value of topics.dir is /topics and the default value of logs.dir is /logs. If you don't specify the two configurations, make sure that the connector user has write access to /topics and /logs. You may need to create /topics and /logs before running the connector, as the connector likely does not have write access to /.

This quick start assumes that security is not configured for HDFS and Hive metastore. To make the necessary security configurations, see the Secure HDFS and Hive Metastore section.

  1. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-hdfs3:latest
  2. Start Confluent Platform.

    confluent local start
  3. Produce test Avro data to the test_hdfs topic in Kafka.

    ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
     # paste each of these messages
     {"f1": "value1"}
     {"f1": "value2"}
     {"f1": "value3"}
  4. Create a hdfs3-sink.json file with the following contents:

      "name": "hdfs-sink",
      "config": {
        "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
        "tasks.max": "1",
        "topics": "test_hdfs",
        "hdfs.url": "hdfs://localhost:9000",
        "flush.size": "3",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"


    The first few settings are common settings you'll specify for all connectors. The topics parameter specifies the topics to export data from; in this case test_hdfs. hdfs.url specifies the HDFS having data written to it. You should set this according to your configuration. flush.size specifies the number of records the connector needs to write before invoking file commits. For high availability HDFS deployments, set hadoop.conf.dir to a directory that includes hdfs-site.xml. After hdfs-site.xml is in place and hadoop.conf.dir has been set, hdfs.url may be set to the namenodes nameservice id, such as nameservice1.

  5. Load the HDFS3 Sink Connector.


    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local load hdfs3 -- -d hdfs3-sink.json
  6. Confirm that the connector is in a RUNNING state.

    confluent local status hdfs3
  7. Validate that the Avro data is in HDFS.

    # list files in partition 0
    hadoop fs -ls /topics/test_hdfs/partition=0
    # the following should appear in the list
    # /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro


    The file name is encoded as topic+kafkaPartition+startOffset+endOffset.format.

  8. Extract the contents of the file using the avro-tools-1.8.2.jar available here.

    # substitute "<namenode>" for the HDFS name node hostname
    hadoop jar avro-tools-1.8.2.jar tojson \
  9. If you experience issues with the previous step, first copy the Avro file from HDFS to the local filesystem and try again with java.

    hadoop fs -copyToLocal /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro \
    java -jar avro-tools-1.8.2.jar tojson /tmp/test_hdfs+0+0000000000+0000000002.avro
    # expected output


If you want to run the quick start with Hive integration, add the following configurations to hdfs-sink.json:

"hive.integration": "true", "hive.metastore.uris": "<thrift uri to your Hive metastore>" "schema.compatibility": "BACKWARD"

After the connector finishes ingesting data to HDFS, you can use Hive to check the data:

$hive>SELECT * FROM test_hdfs;


If the hive.metastore.uris setting is empty, an embedded Hive metastore is created in the directory the connector is started in. Start Hive in that specific directory to query the data.


The HDFS 3 connector offers a variety of features:

Exactly Once Delivery

The connector uses a write-ahead log to ensure each record is written to HDFS exactly once. Also, the connector manages offsets by encoding the Kafka offset information into the HDFS file so that it can start from the last committed offsets in case of failures and task restarts.

Extensible Data Formats

Out of the box, the connector supports writing data to HDFS in Avro and Parquet format. However, you can write other formats to HDFS by extending the Format class.

You must configure the format.class and partitioner.class if you want to write other formats to HDFS or use other partitioners. The following example configurations show how to write Parquet format and use the hourly partitioner:



When using the field partitioner, you must specify the partition.field.name configuration to specify the field name of the record that is used for partitioning.

Extensible Partitioner Strategies

The connector supports a variety of partitions but you can also implement your own partitioner by extending the io.confluent.connect.storage.partitioner.Partitioner class. You can also customize existing partitioners such as the time-based partitioner by extending the io.confluent.connect.storage.partitioner.TimeBasedPartitioner class.

The following partitioners are available by default:

  • DefaultPartitioner: The default partition reuses the Kafka record's partition when encoding the partition.
  • TimeBasedPartitioner: The time-based partitioners allow for partitions to be created based on a set time interval. The HourlyPartitioner and DailyPartitioner preconfigure the intervals, but this partitioner allows full control over the partition duration.
  • HourlyPartitioner: A subclass of the TimeBasedPartitioner that creates partitions on an hourly basis.
  • DailyPartitioner: A subclass of the TimeBasedPartitioner that creates partitions on a daily basis.
  • FieldPartitioner: A partitioner that uses record values of the configured partition.field.name to determine partitions.

Hive Integration

The connector supports Hive integration out of the box. When enabled, the connector automatically creates a Hive external partitioned table for each topic exported to HDFS.

At a minimum, you need to specify hive.integration, hive.metastore.uris and schema.compatibility when integrating Hive.

Here is an example configuration:

hive.metastore.uris=thrift://localhost:9083 # FQDN for the host part

You should adjust hive.metastore.uris according to your Hive configurations.

As connector tasks are long running, the connections to the Hive metastore are kept open until tasks are stopped. In the default Hive configuration, reconnecting to the Hive metastore creates a new connection. When the number of tasks is large, it is possible that the retries can cause the number of open connections to exceed the max allowed connections in the operating system. For this reason, you should set hcatalog.hive.client.cache.disabled to true in hive.xml.


If you don't specify the hive.metastore.uris, the connector uses a local metastore with Derby in the directory running the connector. You need to run Hive in this directory in order to see the Hive metadata change.

To support schema evolution, set schema.compatibility to be BACKWARD, FORWARD or FULL. This ensures that Hive can query the data written to HDFS with different schemas using the latest Hive table schema.

Schema Evolution

The connector supports schema evolution and varying schema compatibility levels. When the connector observes a schema change, it projects to the proper schema according to the schema.compatibility configuration.

If Hive integration is enabled, you must specify the schema.compatibility to be BACKWARD, FORWARD or FULL. This ensures that the Hive table schema is able to query all the data under a topic written with different schemas. If the schema.compatibility is set to BACKWARD or FULL, the Hive table schema for a topic is equivalent to the latest schema in the HDFS files under that topic that can query the whole data of that topic. If the schema.compatibility is set to FORWARD, the Hive table schema of a topic is equivalent to the oldest schema of the HFDS files under that topic that can query the whole data of that topic.

The following are descriptions of each compatibility type:

  • NO Compatibility: By default, the schema.compatibility is set to NONE. In this case, the connector ensures that each file written to HDFS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.

  • BACKWARD Compatibility: If schema evolution is backward compatible, the connector can always use the latest schema to query all the data uniformly. For example, removing fields is a backward compatible change to a schema. When the connector encounters records written with the old schema (that contain the removed fields), it ignores them. Adding a field with a default value is also backward compatible.

    If BACKWARD is specified in the schema.compatibility, the connector keeps track of the latest schema used in writing data to HDFS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in HDFS.

  • FORWARD Compatibility: If schema evolution is forward compatible, the connector uses the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema uses the default value when the field is missing.

    If FORWARD is specified in the schema.compatibility parameter, the connector projects the data to the oldest schema before writing to the same set of files in HDFS.

  • Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.

    If FULL is specified in the schema.compatibility, the connector performs the same action as BACKWARD.

Secure HDFS and Hive Metastore

The connector supports Kerberos authentication to support both secure HDFS and Hive metastore.

To work with secure HDFS and Hive metastore, you need to specify hdfs.authentication.kerberos, connect.hdfs.principal, connect.keytab, hdfs.namenode.principal:

# Replace _HOST with the actual FQDN of the Namenode host
connect.hdfs.keytab=path to the connector keytab
hdfs.namenode.principal=namenode principal

You need to create the Kafka connect principals and keytab files via Kerberos and distribute the keytab file to all hosts running the connector. Make sure that only the connector user has read access to the keytab file. Currently, the connector requires that the principal and the keytab path to be the same on all the hosts running the connector.


When security is enabled, you need to use FQDN from the host portion of hdfs.url and hive.metastore.uris.