HDFS 3 Source Connector for Confluent Platform

The Kafka Connect HDFS 3 Source connector provides the capability to read data exported to HDFS 3 by the Kafka Connect HDFS 3 Sink connector and publish it back to a Kafka topic. Depending on the format and partitioner used to write the data to HDFS 3, this connector can write to the destination topic using the same partitions as the original messages exported to HDFS 3 Sink and maintain the same message order. The connector selects folders based on the partitioner configuration and reads each folder’s HDFS 3 objects in alphabetical order. Each record is read based on the format selected. Configuration is set up to mirror the Kafka Connect HDFS 3 Sink connector and should be possible to make only minor changes to the original sink configuration.

Important

  • You can create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector. If the topics do not exist, Connect relies on Configuring Auto Topic Creation for Source Connectors and the number of partitions are based upon the Kafka broker defaults. If there are more partitions in the destination cluster, the extra partitions are not used. If there are fewer partitions in the destination cluster, the connector task throws an exception and stops the moment it tries to write to a Kafka partition that does not exist.
  • This connector is released separately from the HDFS 2.x connector. If you are targeting an HDFS 2.x distribution, see the HDFS 2 Source connector for Confluent Platform documentation for more details.

Be aware of the following connector actions:

  • The connector ignores any HDFS 3 object with a name that does not start with the configured topics directory. This name is "/topics/" by default.
  • The connector ignores any HDFS 3 object that is below the topics directory but has an extension that does not match the configured format. For example, a JSON file is ignored when format.class is set for Avro files.
  • The connector stops and fails if the HDFS 3 object’s name does not match the expected format or is in an unexpected location.

Avoid the following configuration issues:

  • A file with the correct extension and a valid name format (for example, <topic>+<partition>+<starting-offset>+<ending-offset>.<extension>) placed in a folder of a different topic will be read normally and written to whatever topic as defined by its filename.
  • If a field partitioner is incorrectly configured to match the expected folder, it can break the ordering guarantees of the HDFS 3 sink that used a deterministic sink partitioner.

Features

The HDFS 3 Source connector offers a variety of features:

Pluggable data format with or without schema

Out of the box, the connector supports reading data from HDFS 3 in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.

At least once delivery

In the event of a task failure the connector guarantees no messages are lost. The connector manages offsets so that it can start from the last committed offsets in case of failures and task restarts.

Multiple tasks

The HDFS 3 Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Matching source partitioning

Messages will be put back on to the same Kafka partition for that topic when it was written.

Source partition ordering

The connector will read records back in time order in each topic-source partition if the DefaultPartitioner or a TimeBasedPartitioner is used. If a Field Partitioner is used it isn’t possible to guarantee the order of these messages.

Pluggable partitioner

The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.

Tip

By default, connectors inherit the partitioner used for the Kafka topic. You can create a custom partitioner for a connector which you must place in the connector’s /lib folder.

You can also put partitioners in a common location of choice. If you choose this option, you must add a symlink to the location from each connector’s /lib folder. For example, you would place a custom partitioner in the path share/confluent-hub-components/partitioners and then add the symlink share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners.

Limitations

For the TimeBasedPartitioner, the capacity to scale the connector across various time ranges is limited. Currently, the connector does not support processing data that spans several years.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, see Confluent License Properties.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for HDFS 3 Source Connector for Confluent Platform.

Install HDFS 3 Source Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • Install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.

  • Java 8+. Note that Java 8 is deprecated in versions 7.2 and later of Confluent Platform. For more details, view Java compatibility with Confluent Platform by version.

    Java 17 users must add the following JVM flag to enable the Hive integration feature:

    --add-opens java.base/java.net=ALL-UNNAMED
    
  • An installation of the latest (latest) connector version.

Install the connector using Confluent CLI

To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-hdfs3-source:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-hdfs3-source:2.6.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

This quick start uses the HDFS 3 Source connector to export Avro data to a Kafka topic produced by the HDFS 3 Sink connector. Before you start connector, make sure Hadoop is running locally or remotely and that you know the HDFS URL.

This quick start assumes that you started the required services with the default configurations and you should make necessary changes according to the actual configurations used.

Note

You need to make sure the connector user has read access to the directories specified in topics.dir. The default value of topics.dir is /topics.The following uses the Hdfs3SinkConnector to write a file from the Kafka topic named test_hdfs to HDFS 3. Then, the HDFS3SourceConnector loads that Avro file from HDFS 3 to the Kafka topic named copy_of_test_hdfs.

The following uses the HDFS3SinkConnector to write a file from the Kafka topic named hdfs3_topic to HDFS 3 cluster. Then, the HDFS3SourceConnector loads that Avro file from HDFS 3 to the Kafka topic named copy_of_hdfs3_topic.

Start the Hadoop 3.x cluster.

start-dfs.sh
start-yarn.sh

Follow the instructions from Connect HDFS 3 Sink connector to set up the data to use below.

Start Confluent Platform.

confluent local services start

Property-based example

Create a configuration file for the connector. This file is included with the connector in etc/kafka-connect-hdfs3-source/hdfs3-source.properties. This configuration is used typically along with standalone workers.

name=hdfs3-source
connector.class=io.confluent.connect.hdfs3.Hdfs3SourceConnector
tasks.max=1
hdfs.url=hdfs://localhost:9000
format.class=io.confluent.connect.hdfs3.format.avro.AvroFormat
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

Edit the hdfs3-source.properties to add the following properties:

transforms=AddPrefix
transforms.AddPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.AddPrefix.regex=.*
transforms.AddPrefix.replacement=copy_of_$0

Important

Adding this renames the output of topic of the messages to copy_of_test_hdfs. This prevents a continuous feedback loop of messages if we have both the sink and source connectors operating on the same Kafka topic.

Load the HDFS 3 Source connector.

Caution

You must include a double dash (--) between the connector name and your flag. For more information, see this post.

confluent local services connect connector load hdfs3-source --config hdfs3-source.properties

Important

Don’t use the Confluent CLI in production environments.

Confirm that the connector is in a RUNNING state.

confluent local services connect connector status hdfs3-source

Validate that the Avro data is in the Kafka topic.

kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic copy_of_test_hdfs \
--from-beginning | jq '.'

The response should be three records as shown below.

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

REST-based example

Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API

{
  "name" : "hdfs3-source",
  "config" : {
    "connector.class" : "io.confluent.connect.hdfs3.Hdfs3SourceConnector",
    "tasks.max" : "1",
    "hdfs.url" : "hdfs://localhost:9000/",
    "format.class" : "io.confluent.connect.hdfs3.format.avro.AvroFormat",
    "confluent.topic.bootstrap.servers" : "localhost:9092",
    "confluent.topic.replication.factor" :"1",
    "transforms" : "AddPrefix",
    "transforms.AddPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex" : ".*",
    "transforms.AddPrefix.replacement" : "copy_of_$0"
  }
}

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es), and change the confluent.topic.replication.factor to 3 for staging or production use.

Use curl to post a configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/HDFS3SourceConnector/config

To consume records written by the connector to the configured Kafka topic, run the following command:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081  --topic copy_of_test_hdfs --from-beginning

HDFS 3 Source Connector Partitions

The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class.

The following partitioners are available by default:

  • DefaultPartitioner : To use DefaultPartitioner you have to configure the partition.class:io.confluent.connect.storage.partitioner.DefaultPartitioner. This partitioner helps to read the data from hadoop3 files which are of the form <prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format> and put it in to the kafka topic.
  • TimeBasedPartitioner : The io.confluent.connect.storage.partitioner.TimeBasedPartitioner determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties:
    • The path.format configuration property specifies the pattern used for the <encodedPartition> portion of the hdfs3 file name. For example, when path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH, it will pick Hdfs3 file names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>.
    • The partition.duration.ms configuration property defines the maximum granularity of the Hdfs3 files within a single encoded partition directory. For example, setting partition.duration.ms=600000 (10 minutes) will result in each Hdfs3 file in that directory having no more than 10 minutes of records.
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.
  • HourlyPartitioner : To use HourlyPartitioner you have to configure the partitioner.class : io.confluent.connect.storage.partitioner.HourlyPartitioner.The HourlyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH and partition.duration.ms=3600000.(one hour, for one Hdfs3 file in each hourly directory). This partitioner always results in Hdfs3 file names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>.This partitioner requires the following connector configuration properties:
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.
  • DailyPartitioner : To use DailyPartitioner you have to configure the partitioner.class : io.confluent.connect.storage.partitioner.DailyPartitioner.The DailyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd and partition.duration.ms=86400000 (one day, for one HDFS 3 file in each daily directory). This partitioner will pick HDFS 3 file of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>. This partitioner requires the following connector configuration properties:
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.
  • FieldPartitioner : To use FieldPartitioner you have to configure the partitioner.class : io.confluent.connect.storage.partitioner.FieldPartitioner.The <encodedPartition> is always <topicName>/<fieldName>=<fieldValue>, resulting in HDFS 3 file names of the form <prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>.

HDFS 3 Source Connector Data Formats

The HDFS 3 Source connector supports the following data formats:

  • Avro Format : For supporting Avro Format. You must configure the format.class = io.confluent.connect.hdfs3.format.avro.AvroFormat
  • Json Format : For supporting Json Format. You must configure the format.class = io.confluent.connect.hdfs3.format.json.JsonFormat
  • String Format : For supporting String Format. You must configure the format.class = io.confluent.connect.hdfs3.format.string.StringFormat
  • Parquet Format : For supporting Parquet Format. You must configure the format.class = io.confluent.connect.hdfs3.format.parquet.ParquetFormat

Secure HDFS with Kerberos

The connector supports Kerberos authentication to support secure HDFS.

To work with secure HDFS, you need to specify hdfs.authentication.kerberos, connect.hdfs.principal, connect.keytab, hdfs.namenode.principal.

hdfs.authentication.kerberos=true
connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
connect.hdfs.keytab=/full/path/to/the/connector/keytab
hdfs.namenode.principal=namenode-principal

You need to create the Kafka connect principals and keytab files using Kerberos and distribute the keytab file to all hosts running the connector. Make sure that only the connector user has read access to the keytab file. Currently, the connector requires that the principal and the keytab path to be the same on all the hosts running the connector.

Troubleshooting Connector and Task Failures

Stack Trace

You can use the Connect REST API to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.

Fewer partitions in destination cluster

If there are fewer partitions in the destination cluster than in the source topic, the connector task throws an exception and immediately stops when it tries to write to a Kafka partition that does not exist. You will see the following error messages in the Connect worker log. The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector.

INFO WorkerSourceTask{id=hdfs3-source-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:409)
INFO WorkerSourceTask{id=hdfs3-source-0} flushing 1 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:426)
ERROR WorkerSourceTask{id=hdfs3-source-0} Failed to flush, timed out while waiting
for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
ERROR WorkerSourceTask{id=hdfs3-source-0} Failed to commit offsets
(org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

Error Handling

The following behavior.on.error configuration properties set how the connector handles errors.

  • fail : The connector stops processing when an error occurs. The full batch of records will not be sent to Kafka if any record in the batch is corrupted.
  • ignore : The corrupted record is ignored. The connector continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.
  • log : Logs an error message and continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.

For Parquet Format, when behavior.on.error sets to log or ignore, the connector ignores the file containing a corrupted record and continues processing records for the next file.

Note

The connector always ignores a file which is not in <topic>+<partition>+<starting-offset>+<ending-offset>.<extension> format.