Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
SFTP Source Connector for Confluent Platform¶
The connectors in the Kafka Connect SFTP Source Connector package provide the capability to watch an SFTP directory for files and read the data as new files are written to the SFTP input directory. Once a file has been read, it is placed into the configured finished.path
directory. Each record in the input file is converted based on the user-supplied schema or an auto-generated schema.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
The following connectors are included in the SFTP Source Connector package:
- CSV Source Connector for Confluent Platform
- JSON Source Connector for Confluent Platform
- Schemaless JSON Source Connector for Confluent Platform
Quick Start¶
The following steps show the SftpCsvSourceConnector
loading a mock CSV file to a Kafka topic named sftp-testing-topic
. The other connectors are similar but load from different file types.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-sftp:latest
Tip
By default, it will install the plugin into
share/confluent-hub-components
and add the sftp directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect.Start the Confluent Platform.
confluent local start
Set up an SFTP data directory for files to process, generate test data in your local, and push it to the SFTP server.
echo $'id,first_name,last_name,email,gender,ip_address,last_login,account_balance,country,favorite_color\n1,Salmon,Baitman,sbaitman0@feedburner.com,Male,120.181.75.98,2015-03-01T06:01:15Z,17462.66,IT,#f09bc0\n2,Debby,Brea,dbrea1@icio.us,Female,153.239.187.49,2018-10-21T12:27:12Z,14693.49,CZ,#73893a' > "csv-sftp-source.csv"
Set up SFTP directories for files with errors and files that finished successfully.
mkdir error && mkdir finished
Create a
sftp.json
file with the following contents:{ "name": "CsvSFTP", "config": { "tasks.max": "1", "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector", "cleanup.policy":"NONE", "behavior.on.error":"IGNORE", "input.path": "/path/to/data", "error.path": "/path/to/error", "finished.path": "/path/to/finished", "input.file.pattern": "csv-sftp-source.csv", "sftp.username":"username", "sftp.password":"password", "sftp.host":"localhost", "sftp.port":"22", "kafka.topic": "sftp-testing-topic", "csv.first.row.as.header": "true", "schema.generation.enabled": "true" } }
Load the SFTP CSV Source Connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local load CsvSFTP -- -d sftp.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local status CsvSFTP
Confirm that the messages are being sent to Kafka.
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic sftp-testing-topic \ --from-beginning | jq '.'
Confirm that the source CSV file has been moved to the
finished
directory, if you have opted"cleanup.policy":"MOVE"
.