FTPS Source Connector for Confluent Platform¶
Caution
Preview connectors aren’t currently supported, nor are they recommended for production use.
The Kafka Connect FTPS connector is used to integrate with FTPS servers. The Kafka Connect FTPS Source connector provides the capability to watch a directory on an FTPS server for files and read the data as new files are written to the FTPS input directory. Each record in the input file is converted based on the user-supplied schema or an auto-generated schema and sent to a Apache Kafka® topic.
Features¶
The FTPS Source connector includes the following features:
- At least once delivery
- Supported file types
- Multiple tasks
- File pattern-based filtering
- Automatic schema generation
At least once delivery¶
This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.
Supported file types¶
The connector currently supports parsing CSV, JSON, and TXT files using the same connector. It uses a file’s extension to figure out the file type and the parser to be used. The file types to be parsed can be specified using a config parameter.
Multiple tasks¶
The connector supports running multiple tasks simultaneously, which can be
configured with the tasks.max
configuration property. This can lead to huge
performance gains when multiple files need to be parsed.
File pattern-based filtering¶
The connector supports filtering files based on a regular expression. This can
also be used to parse only specific kinds of files. For example, setting
ftps.input.file.pattern
to test*+\.(csv|json)
causes the connector to
parse only CSV and JSON files beginning with test
, while ignoring all other
files.
Automatic schema generation¶
This is supported for CSV and JSON files. The first record in a file is used to generate the schema for the file.
Limitations¶
- Automatic schema generation is not supported for nested JSON. Instead, schemaless JSON parsing can be used by setting
ftps.schemaless.json.parsing
totrue
.
Install the FTPS Source Connector¶
The FTPS connector is compatible with Confluent Platform from version 4.1 (and later). Prior versions do not work with this connector.
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Kafka Broker: Confluent Platform 3.3.0 or later.
- Connect: Confluent Platform 4.1.0 or later.
- Java 1.8.
- All the required directories and files on the FTPS server must be accessible by the connector.
- The schema can change with each file. For this reason, Confluent recommends
setting the topic schema property to
NONE
.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-ftps:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-ftps:1.0.4
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Licensing for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for FTPS Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick Start¶
In this Quick Start, you will configure the FTPS Source connector to copy data from files on an FTPS server to the Kafka topic.
Install the connector through the Confluent Hub Client.
# run from your confluent platform installation directory confluent connect plugin install confluentinc/kafka-connect-ftps:latest
Start the Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local services start
. For more information, see confluent local.confluent local services start
Check the status of all services.
confluent local services status
Configure your connector by first creating a
.properties
file namedftps.properties
with the following properties.// substitute <> with your config name=FTPSConnector connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter tasks.max=3 kafka.topic=<kafka-topic> confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license=<License. Leave it empty for evaluation license> batch.size=100 ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.username=<username> ftps.password=<password> ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Start the FTPS Source connector by loading the connector’s configuration with the following command:
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load ftps --config ftps.properties
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status FTPSConnector
Confirm the messages were delivered to the configured topic in Kafka.
confluent local services kafka consume <kafka-topic> --from-beginning
Examples¶
JSON file with schema¶
This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.
Generate a JSON file using the command below and push it to FTPS server:
echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-ftps-source.json"
Create an
ftps.properties
file with the following contents:name=JsonFTPS tasks.max=1 connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter kafka.topic=ftps-json-topic ftps.input.file.pattern=json-ftps-source\.json ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.username=<username> ftps.password=<password> ftps.port=<port-number> schema.generation.enabled=false ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password> key.schema={\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}} value.schema={\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
Load the FTPS Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load JsonFTPS --config ftps.properties
Important
Don’t use the Confluent CLI in production environments.
CSV file with automatic schema generation¶
This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.
Generate a CSV file using the command below and push it to FTPS server:
echo $'119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1\n291781,FL,NASSAU COUNTY,0,723734.17,0,0,723734.17,955908.09,0,0,0,0,30.53674,-81.77496,Residential,Masonry,3' > "csv-ftps-source.csv"
Create an
ftps.properties
file with the following contents:name=CsvFTPS tasks.max=1 connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter kafka.topic=ftps-csv-topic ftps.input.file.pattern=csv-ftps-source\.csv ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.port=<port-number> ftps.username=<username> ftps.password=<password> schema.generation.enabled=true ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Load the FTPS Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load CsvFTPS --config ftps.properties
Important
Don’t use the Confluent CLI in production environments.
TXT file¶
This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.
Generate a TXT file using the command below and push it to FTPS server:
echo $'This is line 1.\nThis is line 2.\nThis is line 3.' > "txt-ftps-source.txt"
Create an
ftps.properties
file with the following contents:name=TxtFTPS tasks.max=1 connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter kafka.topic=ftps-txt-topic ftps.input.file.pattern=txt-ftps-source\.txt ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.port=<port-number> ftps.username=<username> ftps.password=<password> ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Load the FTPS Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load TxtFTPS --config ftps.properties
Important
Don’t use the Confluent CLI in production environments.
Parsing multiple file types at once¶
This example follows the same steps as the Quick Start. Review the above Quick Start for help running the Confluent Platform and installing the FTPS connector package.
Generate a JSON file using the command below and push it to FTPS server:
echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-ftps-source.json"
Generate a CSV file using the command below and push it to FTPS server:
echo $'119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1\n291781,FL,NASSAU COUNTY,0,723734.17,0,0,723734.17,955908.09,0,0,0,0,30.53674,-81.77496,Residential,Masonry,3' > "csv-ftps-source.csv"
Generate a TXT file using the command below and push it to FTPS server:
echo $'This is line 1.\nThis is line 2.\nThis is line 3.' > "txt-ftps-source.txt"
The goal is to create a connector which parses only files having a filename with the following format:
<file-type>-ftps-source.<file-type>
where
<file-type>
is the type of file (for example, CSV, TXT, or JSON). Schema will be automatically generated for CSV and JSON files.Create an
ftps.properties
file with the following contents:name=MixedFTPS tasks.max=3 kafka.topic=ftps-mixed-topic connector.class=io.confluent.connect.ftps.FtpsSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ftps.input.file.pattern=.+-ftps-source\..+ ftps.input.path=<Path to files on the server> ftps.host=<host-address> ftps.username=<username> ftps.password=<password> ftps.port=<port-number> ftps.ssl.key.password=<password> ftps.ssl.keystore.location=<path-to-keystore> ftps.ssl.keystore.password=<password> ftps.ssl.truststore.location=<path-to-truststore> ftps.ssl.truststore.password=<password>
Load the FTPS Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load MixedFTPS --config ftps.properties
Important
Don’t use the Confluent CLI in production environments.