HDFS 3 Sink Connector for Confluent Platform¶
The Kafka Connect HDFS 3 Sink connector allows you to export data from Kafka topics to HDFS 3.x files in a variety of formats and integrates with Hive to make data immediately available for querying with HiveQL. Note the following:
This connector is released separately from the HDFS 2.x connector. If you are targeting an HDFS 2.x distribution, see the HDFS 2 Sink connector for Confluent Platform documentation for more details. If you are upgrading from the HDFS 2 Sink connector for Confluent Platform, update
connector.class
toio.confluent.connect.hdfs3.Hdfs3SinkConnector
andpartitioner.class
toio.confluent.connect.storage.partitioner.*
All HDFS 2.x configurations are applicable in this connector.The HDFS 3 Sink connector in your Docker image can only run on a Connect pod where the template includes the
runAsUser
property as shown in the following example:podTemplate: podSecurityContext: fsGroup: 1000 runAsUser: 1000 runAsNonRoot: true
The connector periodically polls data from Apache Kafka® and writes them to HDFS. The data from each Kafka topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is represented as an HDFS file with topic, Kafka partition, start and end offsets of this data chunk in the file name. If a partitioner is not specified in the configuration, the default partitioner which preserves the Kafka partitioning is used. The size of each data chunk is determined by the number of records written to HDFS, the time written to HDFS, and schema compatibility.
The HDFS 3 Sink connector integrates with Hive and when it is enabled, the connector automatically creates an external Hive partitioned table for each Kafka topic and updates the table according to the available data in HDFS.
Features¶
The HDFS 3 Sink connector includes the following features:
- Exactly once delivery
- Dead Letter Queue
- Multiple tasks
- Extensible data formats
- Extensible partitioner strategies
- Hive integration
- Schema evolution
- Secure HDFS and Hive metastore
Exactly once delivery¶
To achieve exactly-once delivery, the connector manages offsets by encoding the Kafka offset information into the HDFS file so that it can start from the last committed offsets in case of failures and task restarts. Additionally, to ensure exact-once delivery, the connector requires the following:
- A write-ahead log (WAL). The WAL log ensures each record is written to HDFS exactly-once.
- The latest file name which encodes the offset information.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The HDFS 3 Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Extensible data formats¶
Out of the box, the connector supports writing data to HDFS in Avro and Parquet format.
However, you can write other formats to HDFS by extending the Format
class.
You must configure the format.class
and partitioner.class
if you want to write other
formats to HDFS or use other partitioners. The following example configurations show how to
write Parquet format and use the field partitioner:
format.class=io.confluent.connect.hdfs3.parquet.ParquetFormat
partitioner.class=io.confluent.connect.storage.partitioner.FieldPartitioner
You must use the
AvroConverter,
ProtobufConverter
, or JsonSchemaConverter
with ParquetFormat
for
this connector. Attempting to use the JsonConverter
(with or without
schemas) results in a NullPointerException and a StackOverflowException.
When using the field partitioner, you must specify the partition.field.name
configuration to specify the field name of the record that is used for
partitioning.
Note that if the source Kafka topic is stored as plain JSON, you can’t use a formatter that requires a schema, you can only use the JSON formatter. The following example shows how to use Parquet format and the field partitioner.
Produce test Avro data to the
parquet_field_hdfs
topic in Kafka../bin/kafka-avro-console-producer --broker-list localhost:9092 --topic parquet_field_hdfs \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"address","type":"string"}, {"name" : "age", "type" : "int"}, {"name" : "is_customer", "type" : "boolean"}]}' # paste each of these messages {"name":"Peter", "address":"Mountain View", "age":27, "is_customer":true} {"name":"David", "address":"Mountain View", "age":37, "is_customer":false} {"name":"Kat", "address":"Palo Alto", "age":30, "is_customer":true} {"name":"David", "address":"San Francisco", "age":35, "is_customer":false} {"name":"Leslie", "address":"San Jose", "age":26, "is_customer":true} {"name":"Dani", "address":"Seatle", "age":32, "is_customer":false} {"name":"Kim", "address":"San Jose", "age":30, "is_customer":true} {"name":"Steph", "address":"Seatle", "age":31, "is_customer":false}
Create a
hdfs3-parquet-field.json
file with the following contents:{ "name": "hdfs3-parquet-field", "config": { "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector", "tasks.max": "1", "topics": "parquet_field_hdfs", "hdfs.url": "hdfs://localhost:9000", "flush.size": "3", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "format.class":"io.confluent.connect.hdfs3.parquet.ParquetFormat", "partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner", "partition.field.name":"is_customer" } }
Load the HDFS3 Sink connector.
confluent local services connect connector load hdfs3-parquet-field --config hdfs3-parquet-field.json
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status hdfs3-parquet-field
Validate that the Parquet data is in HDFS.
# list files in partition called is_customer=true hadoop fs -ls /topics/parquet_field_hdfs/is_customer=true # the following should appear in the list # /topics/parquet_field_hdfs/is_customer=true/parquet_field_hdfs+0+0000000000+0000000002.parquet # /topics/parquet_field_hdfs/is_customer=true/parquet_field_hdfs+0+0000000004+0000000004.parquet
Extract the contents of the file using the parquet-tools-1.9.0.jar.
# substitute "<namenode>" for the HDFS name node hostname hadoop jar parquet-tools-1.9.0.jar cat --json / hdfs://<namenode>/topics/parquet_field_hdfs/is_customer=true/parquet_field_hdfs+0+0000000000+0000000002.parquet
If you experience issues with the previous step, first copy the Parquet file from HDFS to the local filesystem and try again with java.
hadoop fs -copyToLocal /topics/parquet_field_hdfs/is_customer=true/parquet_field_hdfs+0+0000000000+0000000002.parquet / /tmp/parquet_field_hdfs+0+0000000000+0000000002.parquet java -jar parquet-tools-1.9.0.jar cat --json /tmp/parquet_field_hdfs+0+0000000000+0000000002.parquet # expected output {"name":"Peter","address":"Mountain View","age":27,"is_customer":true} {"name":"Kat","address":"Palo Alto","age":30,"is_customer":true}
Extensible partitioner strategies¶
The connector supports a variety of partitions but you can also implement your own
partitioner by extending the io.confluent.connect.storage.partitioner.Partitioner
class.
You can also customize existing partitioners such as the time-based partitioner by extending
the io.confluent.connect.storage.partitioner.TimeBasedPartitioner
class.
The following partitioners are available by default:
DefaultPartitioner
: The default partition reuses the Kafka record’s partition when encoding the partition.TimeBasedPartitioner
: The time-based partitioners allow for partitions to be created based on a set time interval. TheHourlyPartitioner
andDailyPartitioner
preconfigure the intervals, but this partitioner allows full control over the partition duration.HourlyPartitioner
: A subclass of theTimeBasedPartitioner
that creates partitions on an hourly basis.DailyPartitioner
: A subclass of theTimeBasedPartitioner
that creates partitions on a daily basis.FieldPartitioner
: A partitioner that uses record values of the configuredpartition.field.name
to determine partitions.
Hive integration¶
The HDFS 3 Sink connector supports Hive integration out of the box. When Hive integration is enabled, the Connector creates a Hive external partitioned table for each topic exported to HDFS.
At a minimum, you need to specify hive.integration
, hive.metastore.uris
and schema.compatibility
when integrating Hive.
Here is an example configuration:
hive.integration=true
hive.metastore.uris=thrift://localhost:9083 # FQDN for the host part
schema.compatibility=BACKWARD
You should adjust hive.metastore.uris
according to your Hive configurations.
As connector tasks are long running, the connections to the Hive metastore are kept open until tasks are stopped.
In the default Hive configuration, reconnecting to the Hive metastore creates a new connection.
When the number of tasks is large, it is possible that the retries can cause the number of open connections
to exceed the max allowed connections in the operating system. For this reason, you should set
hcatalog.hive.client.cache.disabled
to true
in hive.xml
.
It is important to note that if If you don’t specify the
hive.metastore.uris
, the connector uses a local metastore with Derby in the
directory running the connector. You must run Hive in this directory in order to
see the Hive metadata change.
To support schema evolution, set schema.compatibility
to be BACKWARD
, FORWARD
or
FULL
. This ensures that Hive can query the data written to HDFS with different schemas using the
latest Hive table schema.
Schema evolution¶
The connector supports schema evolution and varying schema compatibility levels.
When the connector observes a schema change, it projects to the proper schema according
to the schema.compatibility
configuration.
Important
Schema evolution only works if the records are generated with the default
naming strategy, which is TopicNameStrategy
. An error may occur if other
naming strategies are used. This is because records are not compatible with
each other. schema.compatibility
should be set to NONE
if other
naming strategies are used. This may result in small object files because the
sink connector creates a new file every time the schema ID changes between
records. See Subject Name Strategy for more information
about naming strategies.
If Hive integration is enabled, you must specify the schema.compatibility
to be BACKWARD
,
FORWARD
or FULL
. This ensures that the Hive table schema is able to query all the data under
a topic written with different schemas. If the schema.compatibility
is set to BACKWARD
or
FULL
, the Hive table schema for a topic is equivalent to the latest schema in the HDFS files
under that topic that can query the whole data of that topic. If the schema.compatibility
is
set to FORWARD
, the Hive table schema of a topic is equivalent to the oldest schema of the HFDS
files under that topic that can query the whole data of that topic.
The following are descriptions of each compatibility type:
NO Compatibility: By default, the
schema.compatibility
is set toNONE
. In this case, the connector ensures that each file written to HDFS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.BACKWARD Compatibility: If schema evolution is backward compatible, the connector can always use the latest schema to query all the data uniformly. For example, removing fields is a backward compatible change to a schema. When the connector encounters records written with the old schema (that contain the removed fields), it ignores them. Adding a field with a default value is also backward compatible.
If
BACKWARD
is specified in theschema.compatibility
, the connector keeps track of the latest schema used in writing data to HDFS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in HDFS.FORWARD Compatibility: If schema evolution is forward compatible, the connector uses the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema uses the default value when the field is missing.
If
FORWARD
is specified in theschema.compatibility
parameter, the connector projects the data to the oldest schema before writing to the same set of files in HDFS.Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.
If
FULL
is specified in theschema.compatibility
, the connector performs the same action asBACKWARD
.
Secure HDFS and Hive metastore¶
The connector supports Kerberos authentication to support both secure HDFS and Hive metastore.
To work with secure HDFS and Hive metastore, you need to specify hdfs.authentication.kerberos
,
connect.hdfs.principal
, connect.keytab
, hdfs.namenode.principal
.
hdfs.authentication.kerberos=true
connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
connect.hdfs.keytab=path to the connector keytab
hdfs.namenode.principal=namenode principal
You need to create the Kafka connect principals and keytab files using Kerberos and distribute the keytab file to all hosts running the connector. Make sure that only the connector user has read access to the keytab file. Currently, the connector requires that the principal and the keytab path to be the same on all the hosts running the connector.
Note that when security is enabled, you need to use FQDN from the host portion
of hdfs.url
and hive.metastore.uris
.
Limitations¶
The HDFS 3 Sink connector does not permit you to run multiple instances of the connector in the same Kerberos environment.
Install HDFS 3 Sink connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
The following are required to run the Kafka Connect HDFS 3 Sink connector:
- Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
- Connect: Confluent Platform 3.3.0 or later or Kafka 0.11.0 or later
- Java 8 or 11
- HDFS 3.x cluster
- Hive 3.x
- You must install the connector on every machine where Connect will run.
- An installation of the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- An installation of the latest (
latest
) connector version.
This connector ships with HDFS 3.x client and Hive 3.x libraries, which are not compatible with HDFS 2.x or Hive 2.x clusters.
Install the connector using the Confluent CLI¶
To install the latest
connector version, navigate to your Confluent Platform installation
directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-hdfs3:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-hdfs3:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. for information about the license topic, see Confluent License Properties.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for HDFS 3 Sink Connector for Confluent Platform.
Quick start¶
This quick start uses the HDFS 3 Sink connector to export data produced by the Avro console producer to HDFS.
Before you start Confluent Platform, ensure the following:
- Hadoop is running locally or remotely and that you know the HDFS URL. For Hive integration, you must have Hive installed and know the metastore thrift URI.
- The connector user has write access to the directories specified in
topics.dir
andlogs.dir
. The default value oftopics.dir
is/topics
and the default value oflogs.dir
is/logs
. If you don’t specify the two configurations, ensure the connector user has write access to/topics
and/logs
. You may need to create/topics
and/logs
before running the connector, as the connector likely doesn’t have write access to/
.
This quick start assumes that you started the required services with the default configurations; you should make necessary changes according to the actual configurations used. This quick start also assumes that security is not configured for HDFS and Hive metastore. To make the necessary security configurations, see the Secure HDFS and Hive Metastore section.
To get started, complete the following steps:
Install the connector using the following CLI command:
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-hdfs3:latest
Start Confluent Platform.
confluent local services start
Produce test Avro data to the
test_hdfs
topic in Kafka../bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' # paste each of these messages {"f1": "value1"} {"f1": "value2"} {"f1": "value3"}
Create a
hdfs3-sink.json
file with the following contents:{ "name": "hdfs3-sink", "config": { "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector", "tasks.max": "1", "topics": "test_hdfs", "hdfs.url": "hdfs://localhost:9000", "flush.size": "3", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Note that the first few settings are common settings you’ll specify for all connectors. The
topics
parameter specifies the topics to export data from. In this case,test_hdfs
. The HDFS connection URL,hdfs.url
, specifies the HDFS to export data to. You should set this according to your configuration.flush.size
specifies the number of records the connector needs to write before invoking file commits. For high availability HDFS deployments, sethadoop.conf.dir
to a directory that includeshdfs-site.xml
andcore-site.xml
. Afterhdfs-site.xml
is in place andhadoop.conf.dir
has been set,hdfs.url
may be set to the namenodes nameservice ID, such asnameservice1
.Load the HDFS 3 Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load hdfs3-sink --config hdfs3-sink.json
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status hdfs3-sink
Validate that the Avro data is in HDFS.
# list files in partition 0 hadoop fs -ls /topics/test_hdfs/partition=0 # the following should appear in the list # /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
The file name is encoded as
topic+kafkaPartition+startOffset+endOffset.format
.Extract the contents of the file using the avro-tools-1.8.2.jar.
# substitute "<namenode>" for the HDFS name node hostname hadoop jar avro-tools-1.8.2.jar tojson \ hdfs://<namenode>/topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
If you experience issues with the previous step, first copy the Avro file from HDFS to the local filesystem and try again with java.
hadoop fs -copyToLocal /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro \ /tmp/test_hdfs+0+0000000000+0000000002.avro java -jar avro-tools-1.8.2.jar tojson /tmp/test_hdfs+0+0000000000+0000000002.avro # expected output {"f1":"value1"} {"f1":"value2"} {"f1":"value3"}
If you want to run the quick start with Hive integration, add the following configurations to
hdfs-sink.json
:"hive.integration": "true", "hive.metastore.uris": "<thrift uri to your Hive metastore>" "schema.compatibility": "BACKWARD"
After the connector finishes ingesting data to HDFS, you can use Hive to check the data:
beeline -e "SELECT * FROM test_hdfs;"
If the
hive.metastore.uris
setting is empty, an embedded Hive metastore is created in the directory the connector is started in. Start Hive in that specific directory to query the data.