Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
FTPS Source Connector for Confluent Platform¶
The Kafka Connect FTPS Source Connector provides the capability to watch a directory on an FTPS server for files and read the data as new files are written to the FTPS input directory. Each record in the input file is converted based on the user-supplied schema or an auto-generated schema and sent to a Apache Kafka® topic.
Features¶
The FTPS Source connector offers the following features:
- Supported file types: The connector currently supports parsing CSV, JSON, and TXT files using the same connector. It uses a file’s extension to figure out the file type and the parser to be used. The file types to be parsed can be specified using a config parameter.
- Multiple tasks: The connector supports running multiple tasks simultaneously, which can be configured with the
tasks.max
configuration property. This can lead to huge performance gains when multiple files need to be parsed. - File pattern-based filtering: The connector supports filtering files based on a regular expression. This can also be used to parse only specific kinds of files. For example, setting
ftps.input.file.pattern
totest*+\.(csv|json)
causes the connector to parse only CSV and JSON files beginning withtest
, while ignoring all other files. - At Least Once Delivery: The connector guarantees that records from the FTPS server are delivered at least once to the Kafka topic. In the case of connector restart, there can be some duplicate records in the Kafka topic.
- Automatic schema generation: This is supported for CSV and JSON files. The first record in a file is used to generate the schema for the file.
Limitations¶
- Automatic schema generation is not supported for nested JSON. Instead, schemaless JSON parsing can be used by setting
ftps.schemaless.json.parsing
totrue
.
Prerequisites¶
The following are required to run the Kafka Connect FTPS Source Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above.
- Connect: Confluent Platform 4.1.0 or above.
- Java 1.8
- All the required directories and files on the FTPS server must be accessible by the connector.
- The schema can change with each file. For this reason, we recommend setting the topic schema property to
NONE
.
Install the FTPS Source Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-ftps:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-ftps:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Licensing for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see FTPS Source Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Quick Start¶
In this Quick Start, you will configure the FTPS Source Connector to copy data from files on an FTPS server to the Kafka topic.
Install the connector through the Confluent Hub Client.
# run from your confluent platform installation directory confluent-hub install confluentinc/kafka-connect-ftps:latest
Start the Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local start
. For more information, see confluent local.confluent local start
Check the status of all services.
confluent local status
Configure your connector by first creating a
.properties
file namedftps.properties
with the following properties.// substitute <> with your config name=FTPSConnector connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter tasks.max=3 kafka.topic=<kafka-topic> confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license=<License. Leave it empty for evaluation license> batch.size=100 ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.username=<username> ftps.password=<password> ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Start the FTPS Source connector by loading the connector’s configuration with the following command:
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load ftps -- -d ftps.properties
Confirm that the connector is in a
RUNNING
state.confluent local status FTPSConnector
Confirm the messages were delivered to the configured topic in Kafka.
confluent local consume <kafka-topic> -- --from-beginning
Examples¶
JSON file with schema¶
This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.
Generate a JSON file using the command below and push it to FTPS server:
echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-ftps-source.json"
Create an
ftps.properties
file with the following contents:name=JsonFTPS tasks.max=1 connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter kafka.topic=ftps-json-topic ftps.input.file.pattern=json-ftps-source\.json ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.username=<username> ftps.password=<password> ftps.port=<port-number> schema.generation.enabled=false ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password> key.schema={\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}} value.schema={\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
Load the FTPS Source Connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local load JsonFTPS -- -d ftps.properties
Important
Don’t use the Confluent CLI in production environments.
CSV file with automatic schema generation¶
This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.
Generate a CSV file using the command below and push it to FTPS server:
echo $'119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1\n291781,FL,NASSAU COUNTY,0,723734.17,0,0,723734.17,955908.09,0,0,0,0,30.53674,-81.77496,Residential,Masonry,3' > "csv-ftps-source.csv"
Create an
ftps.properties
file with the following contents:name=CsvFTPS tasks.max=1 connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter kafka.topic=ftps-csv-topic ftps.input.file.pattern=csv-ftps-source\.csv ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.port=<port-number> ftps.username=<username> ftps.password=<password> schema.generation.enabled=true ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Load the FTPS Source Connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local load CsvFTPS -- -d ftps.properties
Important
Don’t use the Confluent CLI in production environments.
TXT file¶
This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.
Generate a TXT file using the command below and push it to FTPS server:
echo $'This is line 1.\nThis is line 2.\nThis is line 3.' > "txt-ftps-source.txt"
Create an
ftps.properties
file with the following contents:name=TxtFTPS tasks.max=1 connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter kafka.topic=ftps-txt-topic ftps.input.file.pattern=txt-ftps-source\.txt ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.port=<port-number> ftps.username=<username> ftps.password=<password> ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Load the FTPS Source Connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local load TxtFTPS -- -d ftps.properties
Important
Don’t use the Confluent CLI in production environments.
Parsing multiple file types at once¶
his example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.
Generate a JSON file using the command below and push it to FTPS server:
echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-ftps-source.json"
Generate a CSV file using the command below and push it to FTPS server:
echo $'119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1\n291781,FL,NASSAU COUNTY,0,723734.17,0,0,723734.17,955908.09,0,0,0,0,30.53674,-81.77496,Residential,Masonry,3' > "csv-ftps-source.csv"
Generate a TXT file using the command below and push it to FTPS server:
echo $'This is line 1.\nThis is line 2.\nThis is line 3.' > "txt-ftps-source.txt"
The goal is to create a connector which parses only files having the file name of the format <file-type>-ftps-source.<file-type>. Schema will be automatically generated for CSV and JSON files.
Create an
ftps.properties
file with the following contents:name=MixedFTPS tasks.max=3 kafka.topic=ftps-mixed-topic connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ftps.input.file.pattern=.+-ftps-source\..+ ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.username=<username> ftps.password=<password> ftps.port=<port-number> ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Load the FTPS Source Connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local load MixedFTPS -- -d ftps.properties
Important
Don’t use the Confluent CLI in production environments.