SFTP Source Connector for Confluent Platform

The connectors in the Kafka Connect SFTP Source Connector package provide the capability to watch an SFTP directory for files and read the data as new files are written to the SFTP input directory. Once a file has been read, it is placed into the configured finished.path directory. Each record in the input file is converted based on the user-supplied schema or an auto-generated schema.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

The following connectors are included in the SFTP Source Connector package:

Features

The SFTP Source Connectors include the following features:

At least once delivery

Each connector in the SFTP Source Connector package guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Supports one task

The connectors in the SFTP Source Connector package each support running only one task.

Install the SFTP Source Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Important

You must install the connector on every machine where Connect will run.

  • An installation of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-sftp:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-sftp:1.1.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

File Metadata

Starting version 2.3.0, the SFTP Source Connector supports exporting file metadata in Kafka headers. The following information is stored in each record’s headers:

  • file.path - The path of the source file on the SFTP server.
  • file.name - The name of the file from which the record was read.
  • file.name.without.extension - The name of the file without the extension.
  • file.last.modified - The last modification time of the file in seconds.
  • file.length - The size of the file in bytes.
  • file.offset - The offset of the record in the file. (N/A for Binary files)

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a list of configuration properties for the source connectors, refer to the specific connector documentation.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Prerequisites

The following are required to run the Connect SFTP Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8

Quick Start

The following steps show the SftpCsvSourceConnector loading a mock CSV file to a Kafka topic named sftp-testing-topic. The other connectors are similar but load from different file types.

  1. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-sftp:latest
    

    Tip

    By default, it will install the plugin into share/confluent-hub-components and add the sftp directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect.

  2. Start the Confluent Platform.

    confluent local services start
    
  3. Set up an SFTP data directory for files to process, generate test data in your local, and push it to the SFTP server.

    echo $'id,first_name,last_name,email,gender,ip_address,last_login,account_balance,country,favorite_color\n1,Salmon,Baitman,sbaitman0@feedburner.com,Male,120.181.75.98,2015-03-01T06:01:15Z,17462.66,IT,#f09bc0\n2,Debby,Brea,dbrea1@icio.us,Female,153.239.187.49,2018-10-21T12:27:12Z,14693.49,CZ,#73893a' > "csv-sftp-source.csv"
    
  4. Set up SFTP directories for files with errors and files that finished successfully.

    mkdir error && mkdir finished
    
  5. Create a sftp.json file with the following contents:

    {
      "name": "CsvSFTP",
      "config": {
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
        "cleanup.policy":"NONE",
        "behavior.on.error":"IGNORE",
        "input.path": "/path/to/data",
        "error.path": "/path/to/error",
        "finished.path": "/path/to/finished",
        "input.file.pattern": "csv-sftp-source.csv",
        "sftp.username":"username",
        "sftp.password":"password",
        "sftp.host":"localhost",
        "sftp.port":"22",
        "kafka.topic": "sftp-testing-topic",
        "csv.first.row.as.header": "true",
        "schema.generation.enabled": "true"
      }
    }
    
  6. Load the SFTP CSV Source Connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load CsvSFTP --config sftp.json
    

    Important

    Don’t use the Confluent CLI in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status CsvSFTP
    
  8. Confirm that the messages are being sent to Kafka.

    kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 \
        --property schema.registry.url=http://localhost:8081 \
        --topic sftp-testing-topic \
        --from-beginning | jq '.'
    
  9. Confirm that the source CSV file has been moved to the finished directory, if you have opted "cleanup.policy":"MOVE".