HDFS 2 Sink Connector for Confluent Platform¶
The Kafka Connect HDFS 2 Sink connector allows you to export data from Kafka topics to HDFS 2.x files in a variety of formats and integrates with Hive to make data immediately available for querying with HiveQL.
Note that his connector is released separately from the HDFS 3.x connector. If you are targeting an HDFS 3.x distribution, see the HDFS 3 Sink connector documentation for more details.
The connector periodically polls data from Kafka and writes them to HDFS. The data from each Kafka topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is represented as an HDFS file with topic, kafka partition, start and end offsets of this data chunk in the filename. If no partitioner is specified in the configuration, the default partitioner which preserves the Kafka partitioning is used. The size of each data chunk is determined by the number of records written to HDFS, the time written to HDFS and schema compatibility.
The HDFS 2 Sink connector integrates with Hive, and when Hive is enabled, the connector creates an external Hive partitioned table for each Kafka topic and updates the table according to the available data in HDFS.
Features¶
The HDFS 2 Sink connector includes the following features:
Exactly once delivery¶
The connector uses a write-ahead log to ensure each record exports to HDFS exactly once. Also, the connector manages offsets committed by encoding the Kafka offset information into HDFS files. Storing the offset information in HDFS files allows the connector to start from the last committed offsets in case of failures and task restarts.
Note that in addition to committing offset information to HDFS, offset
information is also sent to Kafka Connect for connector progress monitoring.
Upon startup, the HDFS Connector attempts to restore offsets from HDFS files. In
the absence of files in HDFS, the connector attempts to find offsets for its
consumer group in the __consumer_offsets
topic. If offsets are not found,
the consumer relies on the offset management policy specified in the consumer
auto.offset.reset
configuration
property to start
exporting data to HDFS. The default is auto.offset.reset = earliest
.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The HDFS 2 Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Extensible data format¶
Out of the box, the connector supports writing data to HDFS in Avro and Parquet
format. Also, you can write other formats to HDFS by extending the Format
class.
- Hive Integration: The connector supports Hive integration out of the box, and when it is enabled, the connector automatically creates a Hive external partitioned table for each topic exported to HDFS.
- Secure HDFS and Hive Metastore Support: The connector supports Kerberos authentication and works with secure HDFS and Hive metastore.
- Pluggable Partitioner: The connector supports default partitioner, field
partitioner, and time-based partitioner which includes daily and hourly
partitioner. You can implement your own partitioner by extending the
Partitioner
class. You can also customize time-based partitioner by extending theTimeBasedPartitioner
class.
- Schema Evolution: Schema evolution only works if the records are generated with the default naming strategy, which is
TopicNameStrategy
. An error may occur if other naming strategies are used. This is because records are not compatible with each other.schema.compatibility
should be set toNONE
if other naming strategies are used. This may result in small object files because the sink connector creates a new file every time the schema ID changes between records. See Subject Name Strategy for more information about naming strategies.
Limitations¶
The HDFS 2 Sink connector does not permit you to run multiple HDFS clients with different Kerberos configurations in the same worker. This prevents several connector instances from running on the same worker to:
- Connect to multiple Kerberos environments
- Use different user keytabs
Install the HDFS Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- Confluent CLI (requires a separate installation)
- An installation of the latest (
latest
) connector version.
Install the connector using the Confluent CLI¶
To install the latest
connector version, navigate to your Confluent Platform installation
directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-hdfs:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
cconfluent connect plugin install confluentinc/kafka-connect-hdfs:10.2.0
Caution
You can’t mix schema and schemaless records in storage using kafka-connect-storage-common. Attempting this causes a runtime exception. If you are using the self-managed version of this connector, this issue will be evident when you review the log files (only available for the self-managed connector).
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
This connector is available under the Confluent Community License.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for HDFS 2 Sink Connector for Confluent Platform.
For an example of how to connect Kafka Connect to Confluent Cloud, see Distributed Cluster.
Quick start¶
This quick start uses the HDFS connector to export data produced by the Avro console producer to HDFS and assumes the following:
- You have started the required services with the default configurations and you should make necessary changes according to the actual configurations used.
- Security is not configured for HDFS and Hive metastore. To make the necessary security configurations, see Secure HDFS and Hive metastore.
Before you start Confluent Platform, make sure Hadoop is running locally or remotely and that
you know the HDFS URL. For Hive integration, you need to have Hive installed and
to know the metastore thrift URI. You also need to ensure the connector user
has write access to the directories specified in topics.dir
and
logs.dir
. The default value of topics.dir
is /topics
and the default
value of logs.dir
is /logs
, if you don’t specify the two configurations,
make sure that the connector user has write access to /topics
and /logs
.
You may need to create /topics
and /logs
before running the connector as
the connector usually don’t have write access to /
.
Complete the following steps:
Start all the necessary services using the Confluent CLI.
If not already in your PATH, add Confluent’s
bin
directory by running:export PATH=<path-to-confluent>/bin:$PATH
confluent local services start
Every service will start in order, printing a message with its status:
Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP] Starting KSQL Server KSQL Server is [UP] Starting Control Center Control Center is [UP]
Start the Avro console producer to import a few records to Kafka:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
In the console producer, enter the following:
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}
The three records entered are published to the Kafka topic
test_hdfs
in Avro format.Before starting the connector, ensure the configurations in
etc/kafka-connect-hdfs/quickstart-hdfs.properties
are properly set to your configurations of Hadoop (for example,hdfs.url
points to the proper HDFS and using FQDN in the host). Then start connector by loading its configuration with the following command.Note that you must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load hdfs-sink --config etc/kafka-connect-hdfs/quickstart-hdfs.properties { "name": "hdfs-sink", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "test_hdfs", "hdfs.url": "hdfs://localhost:9000", "flush.size": "3", "name": "hdfs-sink" }, "tasks": [] }
Verify the connector started successfully by viewing the Connect worker’s log:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few messages, and then exports data from Kafka to HDFS. Once the connector finishes ingesting data to HDFS, check that the data is available in HDFS:
hadoop fs -ls /topics/test_hdfs/partition=0
You should see a file with name
/topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
The file name is encoded astopic+kafkaPartition+startOffset+endOffset.format
.You can use
avro-tools-1.8.2.jar
(available in Apache mirrors) to extract the content of the file. Runavro-tools
directly on Hadoop as:hadoop jar avro-tools-1.8.2.jar tojson \ hdfs://<namenode>/topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro where "<namenode>" is the HDFS name node hostname. Or, if you experience issues, first copy the avro file from HDFS to the local filesystem and try again with Java: .. codewithvars:: bash hadoop fs -copyToLocal /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro \ /tmp/test_hdfs+0+0000000000+0000000002.avro java -jar avro-tools-1.8.2.jar tojson /tmp/test_hdfs+0+0000000000+0000000002.avro You should see the following output: .. codewithvars:: bash {"f1":"value1"} {"f1":"value2"} {"f1":"value3"}
Stop the Kafka Connect worker as well as all the rest of Confluent Platform by running:
confluent local stop
Your output should resemble:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka REST Kafka REST is [DOWN] Stopping Schema Registry Schema Registry is [DOWN] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN]
You may also stop all the services and wipe out any data generated during this quick start by running the following command:
confluent local destroy
Your output should resemble:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka REST Kafka REST is [DOWN] Stopping Schema Registry Schema Registry is [DOWN] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN] Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
Note that if you want to run the quick start with Hive integration, before starting the connector, you need to add the following configurations to
etc/kafka-connect-hdfs/quickstart-hdfs.properties
:hive.integration=true hive.metastore.uris=thrift uri to your Hive metastore schema.compatibility=BACKWARD
After the connector finishes ingesting data to HDFS, you can use Hive to check the data:
$hive>SELECT * FROM test_hdfs;
If you leave the
hive.metastore.uris
empty, an embedded Hive metastore will be created in the directory the connector is started. You need to start Hive in that specific directory to query the data.
Configuration¶
This section gives example configurations that cover common scenarios. For a complete description of the available configuration options, see Configuration Reference for HDFS 2 Sink Connector for Confluent Platform.
Example¶
Here is the content of etc/kafka-connect-hdfs/quickstart-hdfs.properties
:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs
hdfs.url=hdfs://localhost:9000
flush.size=3
The first few settings are common settings you’ll specify for all connectors.
The topics
specifies the topics to export data from, in this case
test_hdfs
. The hdfs.url
specifies the HDFS to write data to. You should
set this according to your configuration. The flush.size
specifies the
number of records the connector need to write before invoking file commits.
For high-availability HDFS deployments, set hadoop.conf.dir
to the directory containing hdfs-site.xml
. Then, set hdfs.url
to the Namenode nameservice ID (for example, nameservice1
).
Hive integration¶
At minimum, you need to specify hive.integration
, hive.metastore.uris
and schema.compatibility
when integrating Hive. Here is an example
configuration:
hive.integration=true
hive.metastore.uris=thrift://localhost:9083 # FQDN for the host part
schema.compatibility=BACKWARD
You should adjust the hive.metastore.uris
according to your Hive
configurations. Also, you should note the following
- If you don’t specify the
hive.metastore.uris
, the connector will use a local metastore with Derby in the directory running the connector. You need to run Hive in this directory in order to see the Hive metadata change. - As connector tasks are long running, the connections to Hive metastore are
kept open until tasks are stopped. In the default Hive configuration,
reconnecting to Hive metastore creates a new connection. When the number of
tasks is large, it is possible that the retries can cause the number of open
connections to exceed the max allowed connections in the operating system.
Thus it is recommended to set
hcatalog.hive.client.cache.disabled
totrue
inhive.xml
.
Also, to support schema evolution, the schema.compatibility
to be
BACKWARD
, FORWARD
and FULL
. This ensures that Hive can query the
data written to HDFS with different schemas using the latest Hive table schema.
For more information on schema compatibility, see Schema Evolution.
Secure HDFS and Hive metastore¶
To work with secure HDFS and Hive metastore, you need to specify
hdfs.authentication.kerberos
, connect.hdfs.principal
,
connect.keytab
, hdfs.namenode.principal
:
hdfs.authentication.kerberos=true
connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
connect.hdfs.keytab=path to the connector keytab
hdfs.namenode.principal=namenode principal
You need to create the Kafka connect principals and keytab files using Kerberos and distribute the keytab file to all hosts running the connector and ensure that only the connector user has read access to the keytab file. Also, you should note the following:
- When security is enabled, you need to use FQDN for the host part of
hdfs.url
andhive.metastore.uris
. - Currently, the connector requires that the principal and the keytab path to be
the same on all the hosts running the connector. The hostname in the
hdfs.namenode.principal
configuration should be the fully qualified domain name (FQDN) of the Namenode host rather than the_HOST
placeholder.
Partitioners and Storage¶
The storage connector’s partitioner determines how records read from a Apache Kafka®
topic are partitioned into storage objects. The partitioner determines the
<encodedPartition>
portion of the storage object name. The partitioner is
specified in the connector configuration with the partitioner.class
configuration property.
Partitioners¶
The connector comes with the following partitioners:
- Default Kafka Partitioner: The
io.confluent.connect.storage.partitioner.DefaultPartitioner
preserves the same topic partitions as the partitions in the Kafka records. Each topic partition ultimately ends up as a storage object with a name that includes both the Kafka topic and Kafka partition. The<encodedPartition>
is always<topicName>/partition=<kafkaPartition>
, resulting in storage object names like<prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
.
- Field Partitioner: The
io.confluent.connect.storage.partitioner.FieldPartitioner
determines the partition from the field within each each record identified by the connector’spartition.field.name
configuration property, which has no default. This partitioner requiresSTRUCT
record type values. The<encodedPartition>
is always<topicName>/<fieldName>=<fieldValue>
, resulting in storage object names of the form<prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>
.
- Time-Based Partitioner: The
io.confluent.connect.storage.partitioner.TimeBasedPartitioner
determines the partition from the year, month, day, hour, minutes, and seconds. This partitioner requires the following connector configuration properties:- The
path.format
configuration property specifies the pattern used for the<encodedPartition>
portion of the storage object name. For example, whenpath.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
, storage object names will have the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. - The
partition.duration.ms
configuration property defines the maximum granularity of the storage objects within a single encoded partition directory. For example, settingpartition.duration.ms=600000
(10 minutes) results in each storage object in that directory having no more than 10 minutes of records. * Thelocale
configuration property specifies the locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English, andfr-FR
for French (in France). These may vary by Java version. - The
timezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, andRecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property.
- The
- Daily Partitioner: The
io.confluent.connect.storage.partitioner.DailyPartitioner
is equivalent to the TimeBasedPartitioner withpath.format='year'=YYYY/'month'=MM/'day'=dd
andpartition.duration.ms=86400000
(one day, for one storage object in each daily directory). This partitioner always results in storage object names of the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the following connector configuration properties:- The
locale
configuration property specifies the locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English, andfr-FR
for French (in France). These may vary by Java version. - The
timezone
configuration property specifies the current timezone in which the dates and times are set. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, andRecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property.
- The
- Hourly Partitioner: The
io.confluent.connect.storage.partitioner.HourlyPartitioner
is equivalent to the TimeBasedPartitioner withpath.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
andpartition.duration.ms=3600000
(one hour, for one storage object in each hourly directory). This partitioner always results in storage object names of the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the following connector configuration properties:- The
locale
configuration property specifies the locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English,fr-FR
for French (in France). These may vary by Java version. * Thetimezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, andRecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property. As described in the following section, the choice oftimestamp.extractor
affects whether the storage connector can support exactly once delivery.
- The
You can also choose to use a custom partitioner by implementing the
io.confluent.connect.storage.partitioner.Partitioner
interface by packaging
your implementation into a JAR file and then completing the following steps:
- Place the JAR file into the Confluent Platform directory path
share/java/kafka-connect-<storage-connector-name>
on each worker node. - Restart all of the Connect worker nodes.
- Configure storage connectors to use your fully-qualified partitioner class name.
Storage object formats¶
The storage connector can serialize multiple records into each storage object
using a number of formats. The connector’s format.class
configuration
property identifies the name of the Java class that implements the
io.confluent.connect.storage.format.Format
interface. Several of these
implementations are described below:
Avro: Use
format.class=io.confluent.connect.hdfs.avro.AvroFormat
to write the storage object as an Avro container file and include the Avro schema in the container file followed by one or more records. The connector’savro.codec
configuration property specifies the Avro compression code.Values are listed below:
null
(the default) for no Avro compression.deflate
to use the deflate algorithm as specified in RFC 1951,snappy
to use the Google Snappy compression library.bzip2
for BZip2 compression.
Optionally set
enhanced.avro.schema.support=true
to enable enum symbol preservation and package name awareness.
- JSON: Use
format.class=io.confluent.connect.hdfs.json.JsonFormat
to write the storage object as a file containing one JSON serialized record per line. The connector’saz.compression.type
configuration property can be set tonone
(the default) for no compression orgzip
for GZip compression.
- Parquet: Use
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
to write the storage object as a Parquet file columnar storage format. The connector’sparquet.codec
configuration property specifies the Parquet compression code. Values are listed below:snappy
(the default) to use the Google Snappy compression library.none
for no compression.gzip
to use the GNU GZip compression library.lzo
to use LZO (Lempel–Ziv–Oberhumer) compression library.brotli
to use the Google Brotli compression library.lz4
to use BSD licensed LZ4 compression library.zstd
to use Facebook’s ZStandard compression library.
You can also choose to use a custom format by implementing the
io.confluent.connect.storage.format.Format
interface by packaging your
implementation into a JAR file and then completing the following steps:
- Place the JAR file into the Confluent Platform directory path
share/java/kafka-connect-<storage-connector-name>
on each worker node. - Restart all of the Connect worker nodes.
- Configure storage connectors to use your fully-qualified format class name.
Note
For this connector, you must use the
AvroConverter,
ProtobufConverter
, or JsonSchemaConverter
with ParquetFormat
for
this connector. Attempting to use the JsonConverter
(with or without
schemas) results in a NullPointerException and a StackOverflowException.
Storage object uploads¶
As the storage connector processes each record, it uses the partitioner to determine which encoded partition to write the record. This continues for each partition until the connector determines that a partition has enough records and should be flushed and uploaded to storage using the storage object name for that partition. This technique of knowing when to flush a partition file and upload it to storage is called the rotation strategy, and there are a number of ways to control this behavior.
Maximum number of records: The connector’s
flush.size
configuration property specifies the maximum number of records that should be written to a single storage object. There is no default for this setting.Important
Rotation strategy logic: In the following rotation strategies, the logic to flush files to storage is triggered when a new record arrives, after the defined interval or scheduled interval time. Flushing files is also triggered periodically by the
offset.flush.interval.ms
setting defined in the Connect worker configuration. Theoffset.flush.interval.ms
setting defaults to 60000 ms (60 seconds). If you enable the propertiesrotate.interval.ms
orrotate.schedule.interval.ms
and ingestion rate is low, you should setoffset.flush.interval.ms
to a smaller value so that records flush at the rotation interval (or close to the interval) . Leaving theoffset.flush.interval.ms
set to the default 60 seconds may cause records to stay in an open file for longer than expected, if no new records get processed that trigger rotation.Maximum span of record time: In this rotation strategy, the connector’s
rotate.interval.ms
property specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. The timestamp for each file starts with the record timestamp of the first record written to the file, as determined by the partitioner’stimestamp.extractor
. As long as the next record’s timestamp fits within the timespan specified by therotate.interval.ms
property, the record is written to the file. If a record’s timestamp does not fit within the timespan ofrotate.interval.ms
, the connector flushes the file, uploads it to storage, and commits the offsets of the records in that file. After this, the connector creates a new file with a timespan that starts with the first record, and writes the first record to the file.
Scheduled rotation: In this rotation strategy, the connector’s
rotate.schedule.interval.ms
specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. Unlikerotate.interval.ms
, with scheduled rotation the timestamp for each file starts with the system time that the first record is written to the file. You must have the partitioner parametertimezone
configured (defaults to an empty string) when using this configuration property, otherwise the connector fails with an exception.As long as a record is processed within the timespan specified by
rotate.schedule.interval.ms
, the record will be written to the file. As soon as a new record is processed after the timespan for the current file, the file is flushed, uploaded to storage, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the new record is written to the file. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour. The default value-1
means that this feature is disabled.Scheduled rotation uses
rotate.schedule.interval.ms
to close the file and upload to storage on a regular basis using the current time, rather than the record time. Even if the connector has no more records to process, Connect will still call the connector at least everyoffset.flush.interval.ms
, as defined in the Connect worker’s configuration file. And every time this occurs, the connector uses the current time to determine if the currently opened file should be closed and uploaded to storage.
These strategies can be combined as needed. However, when using either of the two rotation strategies described above, the connector only closes and uploads a file to storage when the next file does not belong based upon the timestamp. In other words, if the connector has no more records to process, the connector may keep the file open for a significant period of time, until the connector can process another record.
Schema Evolution¶
The HDFS connector supports schema evolution and reacts to schema changes of
data according to the schema.compatibility
configuration. This section
explains how the connector reacts to schema evolution under different values of
schema.compatibility
. The schema.compatibility
can be set to NONE
,
BACKWARD
, FORWARD
and FULL
, which means NO compatibility, BACKWARD
compatibility, FORWARD compatibility and FULL compatibility respectively.
Important
Schema evolution only works if the records are generated with the default
naming strategy, which is TopicNameStrategy
. An error may occur if other
naming strategies are used. This is because records are not compatible with
each other. schema.compatibility
should be set to NONE
if other
naming strategies are used. This may result in small object files because the
sink connector creates a new file every time the schema ID changes between
records. See Subject Name Strategy for more information
about naming strategies.
NO Compatibility: By default, the
schema.compatibility
is set toNONE
. In this case, the connector ensures that each file written to HDFS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.BACKWARD Compatibility: If a schema is evolved in a backward compatible way, you can always use the latest schema to query all the data uniformly. For example, removing fields is a backward compatible change to a schema, since when you encounter records written with the old schema that contain these fields, you can just ignore them. Adding a field with a default value is also backward compatible.
If
BACKWARD
is specified in theschema.compatibility
, the connector keeps track of the latest schema used in writing data to HDFS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in HDFS.FORWARD Compatibility: If a schema is evolved in a forward compatible way, you can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.
If
FORWARD
is specified in theschema.compatibility
, the connector projects the data to the oldest schema before writing to the same set of files in HDFS.Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.
If
FULL
is specified in theschema.compatibility
, the connector performs the same action asBACKWARD
.
If Hive integration is enabled, you need to specify the schema.compatibility
to be BACKWARD
, FORWARD
or FULL
. This ensures that the Hive table
schema is able to query all the data under a topic written with different
schemas. If the schema.compatibility
is set to BACKWARD
or FULL
, the
Hive table schema for a topic will be equivalent to the latest schema in the
HDFS files under that topic that can query the whole data of that topic. If the
schema.compatibility
is set to FORWARD
, the Hive table schema of a topic
is equivalent to the oldest schema of the HFDS files under that topic that can
query the whole data of that topic.