FTPS Sink Connector for Confluent Platform

Caution

Preview connectors aren’t currently supported, nor are they recommended for production use.

The Kafka Connect FTPS Sink connector provides the capability to export data from Apache Kafka® topics to files in an FTPS servers directory. Supported formats are CSV/TSV, Avro, JSON and Parquet.

The FTPS Sink connector periodically polls data from Kafka and in turn writes it to the FTPS files.

Features

The FTPS Sink connector includes the following features:

Exactly once delivery

Records that are exported are delivered with exactly-once semantics regardless of the eventual consistency of FTPS.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The FTPS Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Pluggable data format

Out of the box, the connector supports writing data to FTPS files in Avro, CSV/TSV, JSON and Parquet formats. In general, the connector may accept any format that provides an implementation of the Format interface.

Install the FTPS Sink Connector

The FTPS connector is compatible with Confluent Platform from version 4.1 (and later). Prior versions do not work with this connector.

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
  • Kafka Broker: Confluent Platform 3.3.0 or later.
  • Connect: Confluent Platform 4.1.0 or later.
  • Java 1.8.
  • All the required directories and files on the FTPS server must be accessible by the connector.
  • The schema can change with each file. For this reason, Confluent recommends setting the topic schema property to NONE.

Install the connector using the Confluent CLI

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-ftps:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-ftps:1.0.4

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Licensing for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for FTPS Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick Start

This quick start uses the FTPS Sink connector to export data produced by the Avro console producer to FTPS directory.

  1. Start all the necessary services using Confluent CLI.

    Tip

    If not already in your PATH, add Confluent’s bin directory by running: export PATH=<path-to-confluent>/bin:$PATH

    confluent local services start
    

    Every service will start in order, printing a message with its status:

    Starting Zookeeper
    Zookeeper is [UP]
    Starting Kafka
    Kafka is [UP]
    Starting Schema Registry
    Schema Registry is [UP]
    Starting Kafka REST
    Kafka REST is [UP]
    Starting Connect
    Connect is [UP]
    Starting KSQL Server
    KSQL Server is [UP]
    Starting Control Center
    Control Center is [UP]
    
  2. Next, start the Avro console producer to import a few records to Kafka:

      ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_ftps_sink \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    
  3. In the console producer, enter the following:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    

    The three records entered are published to the Kafka topic test_ftps_sink in Avro format.

  4. Configure your connector by first creating a .properties file named quickstart-ftps.properties with the following properties.

    # substitute <> with your information
    
       name=FTPSConnector
       connector.class=io.confluent.connect.ftps.FtpsSinkConnector
       key.converter=io.confluent.connect.avro.AvroConverter
       key.converter.schema.registry.url=http://localhost:8081
       value.converter=io.confluent.connect.avro.AvroConverter
       value.converter.schema.registry.url=http://localhost:8081
       tasks.max=3
       topics=test_ftps_sink
       confluent.topic.bootstrap.servers=localhost:9092
       confluent.topic.replication.factor=1
       confluent.license=<License. Leave it empty for evaluation license>
       format.class=io.confluent.connect.ftps.sink.format.avro.AvroFormat
       flush.size=100
       ftps.host=<host-address>
       ftps.port=<port-number>
       ftps.username=<username>
       ftps.password=<password>
       ftps.working.dir=<destination-directory-on-the-server>
       ftps.ssl.key.password=<password>
       ftps.ssl.keystore.location=<path-to-keystore>
       ftps.ssl.keystore.password=<password>
       ftps.ssl.truststore.location=<path-to-truststore>
       ftps.ssl.truststore.password=<password>
    
  5. Start the connector by loading its configuration:

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load ftps-sink --config etc/kafka-connect-ftps/quickstart-ftps.properties
    
  6. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status FTPSConnector
    
  7. After some time, check that the data is available in the FTPS working directory:

    You should see a file with name /test_ftps_sink/partition=0/test_ftps_sink+0+0000000000.avro. The file name is encoded as topic+kafkaPartition+startOffset+endOffset.format.

    To extract the content of the file, you can use avro-tools-1.8.2.jar (available in the Apache Archives).

  8. Move avro-tools-1.8.2.jar to the FTPS working directory and run the following command:

    java -jar avro-tools-1.8.2.jar tojson /<working dir>/test_ftps_sink/partition=0/test_ftps_sink+0+0000000000.avro
    

    You should see the following output:

    {"f1":"value1"}
    {"f1":"value2"}
    {"f1":"value3"}
    

FTPS File Names

The FTPS data model is a flat structure. Each record gets stored into a file and the name of each file serves as the unique key. Generally, the hierarchy of files in which records get stored follow this format:

<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

Where:

  • <prefix> is specified with the connector’s ftps.working.dir configuration property.
  • <topic> corresponds to the name of the Kafka topic from which the records were read.
  • <encodedPartition> is generated by the FTPS sink connector’s partitioner and is of the format partition=<kafka-partition-number>.
  • <kafkaPartition> is the Kafka partition number from which the records were read.
  • <startOffset> is the Kafka offset of the first record written to this file.
  • <format> is the extension identifying the format in which the records are serialized in this FTPS file.

Partitioning Records into FTPS files

The FTPS sink connector’s partitioner determines how records read from a Kafka topic are partitioned into FTPS directories. The partitioner preserves the same topic partitions as in Kafka, and records from each topic partition ultimately end up in an FTPS file with a name that includes the Kafka topic and Kafka partitions.

FTPS File Formats

The FTPS sink connector can serialize multiple records into each FTPS file using a number of formats. The connector’s format.class configuration property identifies the name of the Java class that implements the io.confluent.connect.storage.format.Format interface. The FTPS sink connector comes with several implementations:

  • Avro: Use format.class=io.confluent.connect.ftps.sink.format.avro.AvroFormat to write the FTPS file as an Avro container file and include the Avro schema in the container file followed by one or more records. The connector’s avro.codec configuration property specifies the Avro compression code. Valid values are:

    • null (the default) for no Avro compression
    • deflate to use the deflate algorithm as specified in RFC 1951
    • snappy to use Google’s Snappy compression library
    • bzip2 for BZip2 compression

    Optionally set enhanced.avro.schema.support=true to enable enum symbol preservation and package name awareness.

  • JSON: Use format.class=io.confluent.connect.ftps.sink.format.json.JsonFormat to write the FTPS file containing one JSON serialized record per line. The connector’s ftps.compression.type configuration property can be set to none (the default) for no compression or gzip for GZip compression.

  • Parquet: Use format.class=io.confluent.connect.ftps.sink.format.parquet.ParquetFormat to write the FTPS file as a parquet container file and will include the Parquet schema in the container file followed by one or more records.

  • CSV: Use format.class=io.confluent.connect.ftps.sink.format.csv.CsvFormat to write the FTPS file containing a comma-separated line for each record.

  • TSV: Use format.class=io.confluent.connect.ftps.sink.format.tsv.TsvFormat to write the FTPS file containing a tab space separated line for each record.