FTPS Source Connector for Confluent Platform

The Kafka Connect FTPS Source Connector provides the capability to watch a directory on an FTPS server for files and read the data as new files are written to the FTPS input directory. Each record in the input file is converted based on the user-supplied schema or an auto-generated schema and sent to a Apache Kafka® topic.

Features

The FTPS Source connector offers the following features:

  • Supported file types: The connector currently supports parsing CSV, JSON, and TXT files using the same connector. It uses a file’s extension to figure out the file type and the parser to be used. The file types to be parsed can be specified using a config parameter.
  • Multiple tasks: The connector supports running multiple tasks simultaneously, which can be configured with the tasks.max configuration property. This can lead to huge performance gains when multiple files need to be parsed.
  • File pattern-based filtering: The connector supports filtering files based on a regular expression. This can also be used to parse only specific kinds of files. For example, setting ftps.input.file.pattern to test*+\.(csv|json) causes the connector to parse only CSV and JSON files beginning with test, while ignoring all other files.
  • At Least Once Delivery: The connector guarantees that records from the FTPS server are delivered at least once to the Kafka topic. In the case of connector restart, there can be some duplicate records in the Kafka topic.
  • Automatic schema generation: This is supported for CSV and JSON files. The first record in a file is used to generate the schema for the file.

Limitations

  • Automatic schema generation is not supported for nested JSON. Instead, schemaless JSON parsing can be used by setting ftps.schemaless.json.parsing to true.

Prerequisites

The following are required to run the Kafka Connect FTPS Source Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above.
  • Connect: Confluent Platform 4.1.0 or above.
  • Java 1.8
  • All the required directories and files on the FTPS server must be accessible by the connector.
  • The schema can change with each file. For this reason, we recommend setting the topic schema property to NONE.

Install the FTPS Source Connector

You can install this connector by using the instructions or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-ftps:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-ftps:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Licensing for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see FTPS Source Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

In this Quick Start, you will configure the FTPS Source Connector to copy data from files on an FTPS server to the Kafka topic.

  1. Install the connector through the Confluent Hub Client.

    # run from your confluent platform installation directory
    confluent-hub install confluentinc/kafka-connect-ftps:latest
    
  2. Start the Confluent Platform.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    
  3. Check the status of all services.


confluent local services status

  1. Configure your connector by first creating a .properties file named ftps.properties with the following properties.

    // substitute <> with your config
    name=FTPSConnector
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    tasks.max=3
    kafka.topic=<kafka-topic>
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    confluent.license=<License. Leave it empty for evaluation license>
    batch.size=100
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.username=<username>
    ftps.password=<password>
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    
  2. Start the FTPS Source connector by loading the connector’s configuration with the following command:

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load ftps --config ftps.properties
    
  3. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status FTPSConnector
    
  4. Confirm the messages were delivered to the configured topic in Kafka.

    confluent local services kafka consume <kafka-topic> --from-beginning
    

Examples

JSON file with schema

This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.

  1. Generate a JSON file using the command below and push it to FTPS server:

    echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-ftps-source.json"
    
  2. Create an ftps.properties file with the following contents:

    name=JsonFTPS
    tasks.max=1
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    kafka.topic=ftps-json-topic
    ftps.input.file.pattern=json-ftps-source\.json
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.username=<username>
    ftps.password=<password>
    ftps.port=<port-number>
    schema.generation.enabled=false
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    key.schema={\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}}
    value.schema={\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
    
  3. Load the FTPS Source Connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load JsonFTPS --config ftps.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

CSV file with automatic schema generation

This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.

  1. Generate a CSV file using the command below and push it to FTPS server:

    echo $'119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1\n291781,FL,NASSAU COUNTY,0,723734.17,0,0,723734.17,955908.09,0,0,0,0,30.53674,-81.77496,Residential,Masonry,3' > "csv-ftps-source.csv"
    
  2. Create an ftps.properties file with the following contents:

    name=CsvFTPS
    tasks.max=1
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    kafka.topic=ftps-csv-topic
    ftps.input.file.pattern=csv-ftps-source\.csv
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.port=<port-number>
    ftps.username=<username>
    ftps.password=<password>
    schema.generation.enabled=true
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    
  3. Load the FTPS Source Connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load CsvFTPS --config ftps.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

TXT file

This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.

  1. Generate a TXT file using the command below and push it to FTPS server:

    echo $'This is line 1.\nThis is line 2.\nThis is line 3.' > "txt-ftps-source.txt"
    
  2. Create an ftps.properties file with the following contents:

    name=TxtFTPS
    tasks.max=1
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    kafka.topic=ftps-txt-topic
    ftps.input.file.pattern=txt-ftps-source\.txt
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.port=<port-number>
    ftps.username=<username>
    ftps.password=<password>
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    
  3. Load the FTPS Source Connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load TxtFTPS --config ftps.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

Parsing multiple file types at once

his example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.

  1. Generate a JSON file using the command below and push it to FTPS server:

    echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-ftps-source.json"
    
  2. Generate a CSV file using the command below and push it to FTPS server:

    echo $'119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1\n291781,FL,NASSAU COUNTY,0,723734.17,0,0,723734.17,955908.09,0,0,0,0,30.53674,-81.77496,Residential,Masonry,3' > "csv-ftps-source.csv"
    
  3. Generate a TXT file using the command below and push it to FTPS server:

    echo $'This is line 1.\nThis is line 2.\nThis is line 3.' > "txt-ftps-source.txt"
    
  4. The goal is to create a connector which parses only files having the file name of the format <file-type>-ftps-source.<file-type>. Schema will be automatically generated for CSV and JSON files.

  5. Create an ftps.properties file with the following contents:

    name=MixedFTPS
    tasks.max=3
    kafka.topic=ftps-mixed-topic
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    ftps.input.file.pattern=.+-ftps-source\..+
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.username=<username>
    ftps.password=<password>
    ftps.port=<port-number>
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    
  6. Load the FTPS Source Connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load MixedFTPS --config ftps.properties
    

    Important

    Don’t use the Confluent CLI in production environments.