SFTP Source Connector for Confluent Platform¶
The connectors in the Kafka Connect SFTP Source connector package provide the
capability to watch an SFTP directory for files and read the data as new files
are written to the SFTP input directory. Once a file has been read, it is placed
into the configured finished.path
directory. Each record in the input file
is converted based on the user-supplied schema or an auto-generated schema.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
The following connectors are included in the SFTP Source connector package:
- CSV Source Connector for Confluent Platform
- JSON Source Connector for Confluent Platform
- Schemaless JSON Source Connector for Confluent Platform
- Binary File Source Connector for Confluent Platform
Features¶
The SFTP Source connectors include the following features:
At least once delivery¶
Each connector in the SFTP Source connector package guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Supports one task¶
The connectors in the SFTP Source connector package each support running only one task.
File Metadata¶
setnoindex: | True |
---|
Starting version 2.3.0, the SFTP Source connector supports exporting file metadata in Kafka headers. The following information is stored in each record’s headers:
file.path
- The path of the source file on the SFTP server.file.name
- The name of the file from which the record was read.file.name.without.extension
- The name of the file without the extension.file.last.modified
- The last modification time of the file in seconds.file.length
- The size of the file in bytes.file.offset
- The offset of the record in the file. (N/A for Binary files)
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Limitations¶
The SFTP Source connector has the following limitation:
- Currently, the SFTP Source connector reads and moves a file only once while processing files from a specific SFTP directory. If a file with the same name is produced later in the same SFTP directory, the connector will not process it.
Configuration Properties¶
For a list of configuration properties for the SFTP Source connectors, refer to the specific connector documentation.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the SFTP Source connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
- Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
- Java 1.8.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-sftp:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-sftp:3.2.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
The following steps show the SftpCsvSourceConnector
loading a mock CSV file
to a Kafka topic named sftp-testing-topic
. The other connectors are similar
but load from different file types.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-sftp:latest
Tip
By default, the connector will install the plugin into the
share/confluent-hub-components
directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker.Start the Confluent Platform.
confluent local services start
Set up an SFTP data directory for files to process, generate test data in your local, and push it to the SFTP server.
echo $'id,first_name,last_name,email,gender,ip_address,last_login,account_balance,country,favorite_color\n1,Salmon,Baitman,sbaitman0@feedburner.com,Male,120.181.75.98,2015-03-01T06:01:15Z,17462.66,IT,#f09bc0\n2,Debby,Brea,dbrea1@icio.us,Female,153.239.187.49,2018-10-21T12:27:12Z,14693.49,CZ,#73893a' > "csv-sftp-source.csv"
Set up SFTP directories for files with errors and files that finished successfully.
mkdir error && mkdir finished
Create a
sftp.json
file with the following contents:{ "name": "CsvSFTP", "config": { "tasks.max": "1", "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector", "cleanup.policy":"NONE", "behavior.on.error":"IGNORE", "input.path": "/path/to/data", "error.path": "/path/to/error", "finished.path": "/path/to/finished", "input.file.pattern": "csv-sftp-source.csv", "sftp.username":"username", "sftp.password":"password", "sftp.host":"localhost", "sftp.port":"22", "kafka.topic": "sftp-testing-topic", "csv.first.row.as.header": "true", "schema.generation.enabled": "true" } }
Load the SFTP CSV Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load CsvSFTP --config sftp.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status CsvSFTP
Confirm that the messages are being sent to Kafka.
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic sftp-testing-topic \ --from-beginning | jq '.'
Confirm that the source CSV file has been moved to the
finished
directory, if you have opted"cleanup.policy":"MOVE"
.