Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
FTPS Sink Connector for Confluent Platform¶
The Kafka Connect FTPS Sink Connector provides the capability to export data from Apache Kafka® topics to files in an FTPS servers directory. Supported formats are CSV/TSV, Avro, JSON and Parquet.
The FTPS Sink connector periodically polls data from Kafka and in turn writes it to the FTPS files.
Features¶
The FTPS Sink connector offers the following features:
- Exactly Once Delivery: Records that are exported are delivered with exactly-once semantics regardless of the eventual consistency of FTPS.
- Pluggable Data Format: Out of the box, the connector supports writing data to FTPS files in Avro,
CSV/TSV, JSON and Parquet formats. In general, the connector may accept any format that provides an
implementation of the
Format
interface.
Prerequisites¶
The following are required to run the Kafka Connect FTPS Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above.
- Connect: Confluent Platform 4.1.0 or above.
- Java 1.8
- All the required directories on the FTPS server must be accessible by the connector.
Install the FTPS Sink Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-ftps:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-ftps:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Licensing for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see FTPS Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Quick Start¶
This quick start uses the FTPS Sink connector to export data produced by the Avro console producer to FTPS directory.
Start all the necessary services using Confluent CLI.
Tip
If not already in your PATH, add Confluent’s
bin
directory by running:export PATH=<path-to-confluent>/bin:$PATH
confluent local start
Every service will start in order, printing a message with its status:
Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Starting ksql-server ksql-server is [UP] Starting control-center control-center is [UP]
Next, start the Avro console producer to import a few records to Kafka:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_ftps_sink \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
In the console producer, enter the following:
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}
The three records entered are published to the Kafka topic
test_ftps_sink
in Avro format.Configure your connector by first creating a
.properties
file namedquickstart-ftps.properties
with the following properties.# substitute <> with your information name=FTPSConnector connector.class=io.confluent.connect.ftps.FtpsSinkConnector key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 tasks.max=3 topics=test_ftps_sink confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license=<License. Leave it empty for evaluation license> format.class=io.confluent.connect.ftps.sink.format.avro.AvroFormat flush.size=100 ftps.host=<host-address> ftps.port=<port-number> ftps.username=<username> ftps.password=<password> ftps.working.dir=<destination-directory-on-the-server> ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Start the connector by loading its configuration:
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local load ftps-sink -- -d etc/kafka-connect-ftps/quickstart-ftps.properties
Confirm that the connector is in a
RUNNING
state.confluent local status FTPSConnector
After some time, check that the data is available in the FTPS working directory:
You should see a file with name
/test_ftps_sink/partition=0/test_ftps_sink+0+0000000000.avro
The file name is encoded astopic+kafkaPartition+startOffset+endOffset.format
.You can use
avro-tools-1.8.2.jar
(available in the metrocast.net Apache downnload mirror) to extract the content of the file.Move
avro-tools-1.8.2.jar
to the FTPS working directory and run the following command:java -jar avro-tools-1.8.2.jar tojson /<working dir>/test_ftps_sink/partition=0/test_ftps_sink+0+0000000000.avro
You should see the following output:
{"f1":"value1"} {"f1":"value2"} {"f1":"value3"}
FTPS File Names¶
The FTPS data model is a flat structure. Each record gets stored into a file and the name of each file serves as the unique key. Generally, the hierarchy of files in which records get stored follow this format:
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
Where:
<prefix>
is specified with the connector’sftps.working.dir
configuration property.<topic>
corresponds to the name of the Kafka topic from which the records were read.<encodedPartition>
is generated by the FTPS sink connector’s partitioner and is of the formatpartition=<kafka-partition-number>
.<kafkaPartition>
is the Kafka partition number from which the records were read.<startOffset>
is the Kafka offset of the first record written to this file.<format>
is the extension identifying the format in which the records are serialized in this FTPS file.
Partitioning Records into FTPS files¶
The FTPS sink connector’s partitioner determines how records read from a Kafka topic are partitioned into FTPS directories. The partitioner preserves the same topic partitions as in Kafka, and records from each topic partition ultimately end up in an FTPS file with a name that includes the Kafka topic and Kafka partitions.
FTPS File Formats¶
The FTPS sink connector can serialize multiple records into each FTPS file using
a number of formats. The connector’s format.class
configuration property
identifies the name of the Java class that implements the
io.confluent.connect.storage.format.Format
interface. The FTPS sink
connector comes with several implementations:
Avro: Use
format.class=io.confluent.connect.ftps.sink.format.avro.AvroFormat
to write the FTPS file as an Avro container file and include the Avro schema in the container file followed by one or more records. The connector’savro.codec
configuration property specifies the Avro compression code. Valid values are:null
(the default) for no Avro compressiondeflate
to use the deflate algorithm as specified in RFC 1951snappy
to use Google’s Snappy compression librarybzip2
for BZip2 compression
Optionally set
enhanced.avro.schema.support=true
to enable enum symbol preservation and package name awareness.JSON: Use
format.class=io.confluent.connect.ftps.sink.format.json.JsonFormat
to write the FTPS file as a single JSON array containing a JSON object for each record. The connector’sftps.compression.type
configuration property can be set tonone
(the default) for no compression orgzip
for GZip compression.Parquet: Use
format.class=io.confluent.connect.ftps.sink.format.parquet.ParquetFormat
to write the FTPS file as a parquet container file and will include the Avro schema in the container file followed by one or more records.CSV: Use
format.class=io.confluent.connect.ftps.sink.format.csv.CsvFormat
to write the FTPS file containing a comma-separated line for each record.TSV: Use
format.class=io.confluent.connect.ftps.sink.format.tsv.TsvFormat
to write the FTPS file containing a tab space separated line for each record.