FTPS Sink Connector for Confluent Platform¶
Caution
Preview connectors aren’t currently supported, nor are they recommended for production use.
The Kafka Connect FTPS Sink connector provides the capability to export data from Apache Kafka® topics to files in an FTPS servers directory. Supported formats are CSV/TSV, Avro, JSON and Parquet.
The FTPS Sink connector periodically polls data from Kafka and in turn writes it to the FTPS files.
Features¶
The FTPS Sink connector includes the following features:
Exactly once delivery¶
Records that are exported are delivered with exactly-once semantics regardless of the eventual consistency of FTPS.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The FTPS Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Pluggable data format¶
Out of the box, the connector supports writing data to FTPS files in Avro,
CSV/TSV, JSON and Parquet formats. In general, the connector may accept any
format that provides an implementation of the Format
interface.
Install the FTPS Sink Connector¶
The FTPS connector is compatible with Confluent Platform from version 4.1 (and later). Prior versions do not work with this connector.
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Kafka Broker: Confluent Platform 3.3.0 or later.
- Connect: Confluent Platform 4.1.0 or later.
- Java 1.8.
- All the required directories and files on the FTPS server must be accessible by the connector.
- The schema can change with each file. For this reason, Confluent recommends
setting the topic schema property to
NONE
.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-ftps:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-ftps:1.0.4
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Licensing for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for FTPS Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick Start¶
This quick start uses the FTPS Sink connector to export data produced by the Avro console producer to FTPS directory.
Start all the necessary services using Confluent CLI.
Tip
If not already in your PATH, add Confluent’s
bin
directory by running:export PATH=<path-to-confluent>/bin:$PATH
confluent local services start
Every service will start in order, printing a message with its status:
Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP] Starting KSQL Server KSQL Server is [UP] Starting Control Center Control Center is [UP]
Next, start the Avro console producer to import a few records to Kafka:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_ftps_sink \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
In the console producer, enter the following:
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}
The three records entered are published to the Kafka topic
test_ftps_sink
in Avro format.Configure your connector by first creating a
.properties
file namedquickstart-ftps.properties
with the following properties.# substitute <> with your information name=FTPSConnector connector.class=io.confluent.connect.ftps.FtpsSinkConnector key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 tasks.max=3 topics=test_ftps_sink confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license=<License. Leave it empty for evaluation license> format.class=io.confluent.connect.ftps.sink.format.avro.AvroFormat flush.size=100 ftps.host=<host-address> ftps.port=<port-number> ftps.username=<username> ftps.password=<password> ftps.working.dir=<destination-directory-on-the-server> ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Start the connector by loading its configuration:
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load ftps-sink --config etc/kafka-connect-ftps/quickstart-ftps.properties
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status FTPSConnector
After some time, check that the data is available in the FTPS working directory:
You should see a file with name
/test_ftps_sink/partition=0/test_ftps_sink+0+0000000000.avro
. The file name is encoded astopic+kafkaPartition+startOffset+endOffset.format
.To extract the content of the file, you can use
avro-tools-1.8.2.jar
(available in the Apache Archives).Move
avro-tools-1.8.2.jar
to the FTPS working directory and run the following command:java -jar avro-tools-1.8.2.jar tojson /<working dir>/test_ftps_sink/partition=0/test_ftps_sink+0+0000000000.avro
You should see the following output:
{"f1":"value1"} {"f1":"value2"} {"f1":"value3"}
FTPS File Names¶
The FTPS data model is a flat structure. Each record gets stored into a file and the name of each file serves as the unique key. Generally, the hierarchy of files in which records get stored follow this format:
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
Where:
<prefix>
is specified with the connector’sftps.working.dir
configuration property.<topic>
corresponds to the name of the Kafka topic from which the records were read.<encodedPartition>
is generated by the FTPS sink connector’s partitioner and is of the formatpartition=<kafka-partition-number>
.<kafkaPartition>
is the Kafka partition number from which the records were read.<startOffset>
is the Kafka offset of the first record written to this file.<format>
is the extension identifying the format in which the records are serialized in this FTPS file.
Partitioning Records into FTPS files¶
The FTPS sink connector’s partitioner determines how records read from a Kafka topic are partitioned into FTPS directories. The partitioner preserves the same topic partitions as in Kafka, and records from each topic partition ultimately end up in an FTPS file with a name that includes the Kafka topic and Kafka partitions.
FTPS File Formats¶
The FTPS sink connector can serialize multiple records into each FTPS file using
a number of formats. The connector’s format.class
configuration property
identifies the name of the Java class that implements the
io.confluent.connect.storage.format.Format
interface. The FTPS sink
connector comes with several implementations:
Avro: Use
format.class=io.confluent.connect.ftps.sink.format.avro.AvroFormat
to write the FTPS file as an Avro container file and include the Avro schema in the container file followed by one or more records. The connector’savro.codec
configuration property specifies the Avro compression code. Valid values are:null
(the default) for no Avro compressiondeflate
to use the deflate algorithm as specified in RFC 1951snappy
to use Google’s Snappy compression librarybzip2
for BZip2 compression
Optionally set
enhanced.avro.schema.support=true
to enable enum symbol preservation and package name awareness.JSON: Use
format.class=io.confluent.connect.ftps.sink.format.json.JsonFormat
to write the FTPS file containing one JSON serialized record per line. The connector’sftps.compression.type
configuration property can be set tonone
(the default) for no compression orgzip
for GZip compression.Parquet: Use
format.class=io.confluent.connect.ftps.sink.format.parquet.ParquetFormat
to write the FTPS file as a parquet container file and will include the Parquet schema in the container file followed by one or more records.CSV: Use
format.class=io.confluent.connect.ftps.sink.format.csv.CsvFormat
to write the FTPS file containing a comma-separated line for each record.TSV: Use
format.class=io.confluent.connect.ftps.sink.format.tsv.TsvFormat
to write the FTPS file containing a tab space separated line for each record.