FTPS Source Connector for Confluent Platform

Caution

Preview connectors aren’t currently supported, nor are they recommended for production use.

The Kafka Connect FTPS connector is used to integrate with FTPS servers. The Kafka Connect FTPS Source connector provides the capability to watch a directory on an FTPS server for files and read the data as new files are written to the FTPS input directory. Each record in the input file is converted based on the user-supplied schema or an auto-generated schema and sent to a Apache Kafka® topic.

Features

The FTPS Source connector includes the following features:

At least once delivery

This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.

Supported file types

The connector currently supports parsing CSV, JSON, and TXT files using the same connector. It uses a file’s extension to figure out the file type and the parser to be used. The file types to be parsed can be specified using a config parameter.

Multiple tasks

The connector supports running multiple tasks simultaneously, which can be configured with the tasks.max configuration property. This can lead to huge performance gains when multiple files need to be parsed.

File pattern-based filtering

The connector supports filtering files based on a regular expression. This can also be used to parse only specific kinds of files. For example, setting ftps.input.file.pattern to test*+\.(csv|json) causes the connector to parse only CSV and JSON files beginning with test, while ignoring all other files.

Automatic schema generation

This is supported for CSV and JSON files. The first record in a file is used to generate the schema for the file.

Limitations

  • Automatic schema generation is not supported for nested JSON. Instead, schemaless JSON parsing can be used by setting ftps.schemaless.json.parsing to true.

Install the FTPS Source Connector

The FTPS connector is compatible with Confluent Platform from version 4.1 (and later). Prior versions do not work with this connector.

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
  • Kafka Broker: Confluent Platform 3.3.0 or later.
  • Connect: Confluent Platform 4.1.0 or later.
  • Java 1.8.
  • All the required directories and files on the FTPS server must be accessible by the connector.
  • The schema can change with each file. For this reason, Confluent recommends setting the topic schema property to NONE.

Install the connector using the Confluent CLI

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-ftps:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-ftps:1.0.4

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Licensing for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for FTPS Source Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick Start

In this Quick Start, you will configure the FTPS Source connector to copy data from files on an FTPS server to the Kafka topic.

  1. Install the connector through the Confluent Hub Client.

    # run from your confluent platform installation directory
    confluent connect plugin install confluentinc/kafka-connect-ftps:latest
    
  2. Start the Confluent Platform.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    
  3. Check the status of all services.

    confluent local services status
    
  4. Configure your connector by first creating a .properties file named ftps.properties with the following properties.

    // substitute <> with your config
    name=FTPSConnector
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    tasks.max=3
    kafka.topic=<kafka-topic>
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    confluent.license=<License. Leave it empty for evaluation license>
    batch.size=100
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.username=<username>
    ftps.password=<password>
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    
  5. Start the FTPS Source connector by loading the connector’s configuration with the following command:

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load ftps --config ftps.properties
    
  6. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status FTPSConnector
    
  7. Confirm the messages were delivered to the configured topic in Kafka.

    confluent local services kafka consume <kafka-topic> --from-beginning
    

Examples

JSON file with schema

This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.

  1. Generate a JSON file using the command below and push it to FTPS server:

    echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-ftps-source.json"
    
  2. Create an ftps.properties file with the following contents:

    name=JsonFTPS
    tasks.max=1
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    kafka.topic=ftps-json-topic
    ftps.input.file.pattern=json-ftps-source\.json
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.username=<username>
    ftps.password=<password>
    ftps.port=<port-number>
    schema.generation.enabled=false
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    key.schema={\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}}
    value.schema={\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
    
  3. Load the FTPS Source connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load JsonFTPS --config ftps.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

CSV file with automatic schema generation

This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.

  1. Generate a CSV file using the command below and push it to FTPS server:

    echo $'119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1\n291781,FL,NASSAU COUNTY,0,723734.17,0,0,723734.17,955908.09,0,0,0,0,30.53674,-81.77496,Residential,Masonry,3' > "csv-ftps-source.csv"
    
  2. Create an ftps.properties file with the following contents:

    name=CsvFTPS
    tasks.max=1
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    kafka.topic=ftps-csv-topic
    ftps.input.file.pattern=csv-ftps-source\.csv
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.port=<port-number>
    ftps.username=<username>
    ftps.password=<password>
    schema.generation.enabled=true
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    
  3. Load the FTPS Source connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load CsvFTPS --config ftps.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

TXT file

This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.

  1. Generate a TXT file using the command below and push it to FTPS server:

    echo $'This is line 1.\nThis is line 2.\nThis is line 3.' > "txt-ftps-source.txt"
    
  2. Create an ftps.properties file with the following contents:

    name=TxtFTPS
    tasks.max=1
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    kafka.topic=ftps-txt-topic
    ftps.input.file.pattern=txt-ftps-source\.txt
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.port=<port-number>
    ftps.username=<username>
    ftps.password=<password>
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    
  3. Load the FTPS Source connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load TxtFTPS --config ftps.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

Parsing multiple file types at once

This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.

  1. Generate a JSON file using the command below and push it to FTPS server:

    echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-ftps-source.json"
    
  2. Generate a CSV file using the command below and push it to FTPS server:

    echo $'119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1\n291781,FL,NASSAU COUNTY,0,723734.17,0,0,723734.17,955908.09,0,0,0,0,30.53674,-81.77496,Residential,Masonry,3' > "csv-ftps-source.csv"
    
  3. Generate a TXT file using the command below and push it to FTPS server:

    echo $'This is line 1.\nThis is line 2.\nThis is line 3.' > "txt-ftps-source.txt"
    
  4. The goal is to create a connector which parses only files having a filename with the following format:

    <file-type>-ftps-source.<file-type>
    

    where <file-type> is the type of file (for example, CSV, TXT, or JSON). Schema will be automatically generated for CSV and JSON files.

  5. Create an ftps.properties file with the following contents:

    name=MixedFTPS
    tasks.max=3
    kafka.topic=ftps-mixed-topic
    connector.class=io.confluent.connect.ftps.FtpsSourceConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    ftps.input.file.pattern=.+-ftps-source\..+
    ftps.input.path=<Path to files on the server>
    ftps.host=<host-address>
    ftps.username=<username>
    ftps.password=<password>
    ftps.port=<port-number>
    ftps.ssl.key.password=<password>
    ftps.ssl.keystore.location=<path-to-keystore>
    ftps.ssl.keystore.password=<password>
    ftps.ssl.truststore.location=<path-to-truststore>
    ftps.ssl.truststore.password=<password>
    
  6. Load the FTPS Source connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load MixedFTPS --config ftps.properties
    

    Important

    Don’t use the Confluent CLI in production environments.