FTPS Sink Connector for Confluent Platform
The Kafka Connect FTPS Sink Connector provides the capability to export data from Apache Kafka® topics to files in an FTPS servers directory. Supported formats are CSV/TSV, Avro, JSON and Parquet.
The FTPS Sink connector periodically polls data from Kafka and in turn writes it to the FTPS files.
The FTPS Sink connector offers the following features:
- Exactly Once Delivery: Records that are exported are delivered with exactly-once semantics regardless of the eventual consistency of FTPS.
- Pluggable Data Format: Out of the box, the connector supports writing data to FTPS files in Avro,
CSV/TSV, JSON and Parquet formats. In general, the connector may accept any format that provides an
implementation of the
The following are required to run the Kafka Connect FTPS Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above.
- Connect: Confluent Platform 4.1.0 or above.
- Java 1.8
- All the required directories on the FTPS server must be accessible by the connector.
Install the FTPS Sink Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (
latest) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-ftps:latest
You can install a specific version by replacing
latest with a version number. For example:
confluent-hub install confluentinc/kafka-connect-ftps:1.0.0-preview
This quick start uses the FTPS Sink connector to export data produced by the Avro console producer to FTPS directory.
Start all the necessary services using Confluent CLI.
If not already in your PATH, add Confluent’s
bin directory by running:
confluent local services start
Every service will start in order, printing a message with its status:
Zookeeper is [UP]
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
Next, start the Avro console producer to import a few records to Kafka:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_ftps_sink \
In the console producer, enter the following:
The three records entered are published to the Kafka topic
test_ftps_sink in Avro format.
Configure your connector by first creating a
.properties file named
quickstart-ftps.properties with the following properties.
# substitute <> with your information
confluent.license=<License. Leave it empty for evaluation license>
Start the connector by loading its configuration:
You must include a double dash (
--) between the connector name and your flag. For more information,
see this post.
confluent local services connect connector load ftps-sink --config etc/kafka-connect-ftps/quickstart-ftps.properties
Confirm that the connector is in a
confluent local services connect connector status FTPSConnector
After some time, check that the data is available in the FTPS working directory:
You should see a file with name
/test_ftps_sink/partition=0/test_ftps_sink+0+0000000000.avro The file
name is encoded as
You can use
(available in the metrocast.net Apache downnload mirror)
to extract the content of the file.
avro-tools-1.8.2.jar to the FTPS working directory and run the following command:
java -jar avro-tools-1.8.2.jar tojson /<working dir>/test_ftps_sink/partition=0/test_ftps_sink+0+0000000000.avro
You should see the following output:
FTPS File Names
The FTPS data model is a flat structure. Each record gets stored into a file and
the name of each file serves as the unique key. Generally, the hierarchy of
files in which records get stored follow this format:
<prefix> is specified with the connector’s
ftps.working.dir configuration property.
<topic> corresponds to the name of the Kafka topic from which the records were read.
<encodedPartition> is generated by the FTPS sink connector’s partitioner and is of the format
<kafkaPartition> is the Kafka partition number from which the records were read.
<startOffset> is the Kafka offset of the first record written to this file.
<format> is the extension identifying the format in which the records are serialized in this FTPS file.
Partitioning Records into FTPS files
The FTPS sink connector’s partitioner determines how records read from a Kafka
topic are partitioned into FTPS directories. The partitioner preserves the same
topic partitions as in Kafka, and records from each topic partition ultimately
end up in an FTPS file with a name that includes the Kafka topic and Kafka partitions.
FTPS File Formats
The FTPS sink connector can serialize multiple records into each FTPS file using
a number of formats. The connector’s
format.class configuration property
identifies the name of the Java class that implements the
io.confluent.connect.storage.format.Format interface. The FTPS sink
connector comes with several implementations:
format.class=io.confluent.connect.ftps.sink.format.avro.AvroFormat to write the FTPS file as an Avro container file
and include the Avro schema in the container file followed by one or more records. The connector’s
avro.codec configuration property specifies the Avro compression code. Valid values are:
null (the default) for no Avro compression
deflate to use the deflate algorithm as specified in RFC 1951
snappy to use Google’s Snappy compression library
bzip2 for BZip2 compression
enhanced.avro.schema.support=true to enable enum symbol preservation and package name awareness.
format.class=io.confluent.connect.ftps.sink.format.json.JsonFormat to write the FTPS file as a single JSON array
containing a JSON object for each record. The connector’s
ftps.compression.type configuration property
can be set to
none (the default) for no compression or
gzip for GZip compression.
format.class=io.confluent.connect.ftps.sink.format.parquet.ParquetFormat to write the FTPS file as a
parquet container file and will include the Avro schema in the container file followed by one or more records.
format.class=io.confluent.connect.ftps.sink.format.csv.CsvFormat to write the FTPS file
containing a comma-separated line for each record.
format.class=io.confluent.connect.ftps.sink.format.tsv.TsvFormat to write the FTPS file
containing a tab space separated line for each record.