SFTP Source Connector for Confluent Cloud

The fully-managed Kafka Connect SFTP Source connector watches an SFTP directory for files and reads the data as new files get written to the directory. Each file is parsed based on one of the following property values used with the input.file.parser.format configuration, which are also selectable in the UI.

  • BINARY
  • CSV
  • JSON (the default)
  • SCHEMALESS_JSON

Once a file has been read, it is placed into a finished.path directory or an error.path directory.

Features

The SFTP Source connector supports the following features:

  • At least once delivery: The connector guarantees that records are delivered at least once to the Kafka topic (if the file row parsed is valid).
  • Supports one task: The connector supports running one task per connector instance.
  • Supported output data formats: The connector supports Avro, JSON Schema (JSON-SR), Protobuf, JSON (schemaless), Bytes, and String output record value formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). See Environment Limitations for additional information.

For configuration property values and descriptions, see Configuration Properties.

For additional information, refer to Cloud connector limitations.

Quick Start

Use this quick start to get up and running with the Confluent Cloud SFTP Source connector. The quick start provides the basics of selecting the connector and configuring it to get data from an SFTP host.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
  • Access to an SFTP host.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the source connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the SFTP Source connector icon.

SFTP Sink Connector Icon

Step 4: Set up the connection.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  • The steps provide information about how to use the required configuration properties. See Configuration Properties for other configuration property values and descriptions.
  1. Select one or more topics.

  2. Enter a connector Name.

  3. Select an Output Kafka record value format. The connector supports Avro, JSON Schema (JSON_SR), Protobuf, JSON (schemaless), Bytes, and String output Kafka record value formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). See Environment Limitations for additional information.

    Note

    Note the following relationship between an Output record value and the following Input file parser format property:

    • If you select the BINARY Input file parser, you must select BYTES for the output Kafka record value format.
    • If you select the SCHEMALESS_JSON Input file parser, you must select STRING for the output Kafka record value format.
    • If you use the default JSON or select the CSV Input file parser, you can use any format for the output Kafka record value format.
  4. Select an Input file parser format. This is the parser format used to parse fetched files from the SFTP directory. Defaults to JSON.

    Important

    If you use JSON (the default) or CSV as the Input file parser format, you must set Schema generation enabled to true. If this configuration is set to false, you must provide a key schema and a value schema.

  5. Select the way you want to provide Kafka Cluster credentials. You can either select a service account resource ID or you can enter an API key and secret (or generate these in the Cloud Console).

  6. Enter the SFTP Details:

    • SFTP Host: Enter the host address for the SFTP server. For example 192.168.1.231. SFTP Port defaults to 22.
    • Username: Enter the user name that the connector will use to connect to the host.
    • Password: Enter the password for the user name entered. A password is not required if a PEM file is used for key based authentication to the host.
  7. Add the SFTP directory details.

    • Input path: The SFTP directory where the connector places files that are successfully processed. This directory must exist and be writable by the connector.
    • Finished path: The SFTP directory from which the connector reads files that will be processed. This directory must exist and be writable by the connector.
    • Error path: The SFTP directory where the connector places files in which there are errors. This directory must exist and be writable by the connector.
  8. Select the Input file pattern (regex): A regular expression to check input file names against. This expression must match the entire filename. This is equivalent to Matcher.matches(). Using .* accepts all files in the directory.

  9. Enter the number of tasks. The connector supports running one tasks per connector.

  10. Transforms and Predicates: See the Single Message Transforms (SMT) documentation for details.

See Configuration Properties for configuration property values and descriptions.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Step 7: Check for records.

Verify that records are being produced in the Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Using the Confluent CLI

To set up and run the connector using the Confluent CLI, complete the following steps.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

confluent connect plugin describe <connector-catalog-name>

For example:

confluent connect plugin describe SftpSource

Example output:

Following are the required configs:
connector.class: SftpSource
name
kafka.api.key
kafka.api.secret
kafka.topic
output.data.format
sftp.host
sftp.username
input.path
finished.path
error.path
input.file.pattern
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "SftpSource",
  "name": "SftpSourceConnector_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "*********************************",
  "kafka.topic": "orders",
  "output.data.format": "JSON",
  "input.file.parser.format": "CSV",
  "schema.generation.enable": "true",
  "sftp.host": "192.168.1.231",
  "sftp.username": "connect-user",
  "sftp.password:": "****************",
  "input.path": "/path/to/data",
  "finished.path": "/path/to/finished",
  "error.path": "/path/to/error",
  "input.file.pattern": "csv-sftp-source.csv",
  "tasks.max": "1",
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "name": Sets a name for your new connector.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "kafka.topic": Enter the topic name or a comma-separated list of topic names.

  • "output.data.format": The connector supports Avro, JSON Schema (JSON_SR), Protobuf, JSON (schemaless), Bytes, and String output Kafka record value formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). See Environment Limitations for additional information.

    Note

    Note the following relationship between output.data.format and the input.file.parser.format property.

    • If you use BINARY for input.file.parser.format, you must use BYTES for output.data.format.
    • If you use SCHEMALESS_JSON for input.file.parser.format, you must use STRING for output.data.format.
    • If you leave this to JSON (the default) or use CSV for input.file.parser.format, you can use any format for output.data.format.
  • "input.file.parser.format": The parser format used to parse fetched files from the SFTP directory. Defaults to JSON. Options are BINARY, CSV, JSON, SCHEMALESS_JSON.

    Important

    If you use JSON (the default) or CSV as the input.file.parser.format, then you must add the property schema.generation.enable and set it to true. If you set this property to false, you must provide a key.schema and a value.schema.

  • "sftp.host": Enter the host address for the SFTP server. For example 192.168.1.231. Note that the port defaults to 22. To change this, add the property "sftp.port".

  • "sftp.username": Enter the user name that the connector will use to connect to the host. The "sftp.password" property is not required if a PEM file is used for key based authentication to the host.

  • "input.path": Add the SFTP directory where the connector places files that are successfully processed. This directory must exist and be writable by the connector.

  • "finished.path": Add the SFTP directory from which the connector reads files that will be processed. This directory must exist and be writable by the connector.

  • "error.path": Add the SFTP directory where the connector places files in which there are errors. This directory must exist and be writable by the connector.

  • "input.file.pattern": Add a regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches(). Using .* accepts all files in the directory.

  • "tasks.max": The connector supports running one tasks per connector.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

For configuration property values and descriptions, see Configuration Properties.

Step 3: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent connect create --config <file-name>.json

For example:

confluent connect create --config sftp-source-config.json

Example output:

Created connector SftpSourceConnector_0 lcc-do6vzd

Step 4: Check the connector status.

Enter the following command to check the connector status:

confluent connect list

Example output:

ID           |             Name            | Status  | Type   | Trace
+------------+-----------------------------+---------+--------+-------+
lcc-do6vzd   | SftpSourceConnector_0       | RUNNING | source |       |

Step 5: Check the Kafka topic.

Verify that records are being produced at the Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Configuration Properties

The following connector configuration properties can be used with the SFTP Source connector for Confluent Cloud.

Common configuration properties

The following configuration properties are valid for all input file parser formats (that is, BYTES, SCHEMALESS_JSON, JSON, and CSV).

sftp.host

SFTP server host address.

  • Importance: High
  • Type: String
sftp.port

SFTP port number.

  • Importance: High
  • Type: Int
  • Default value: 22
sftp.username

User name for the SFTP server connection.

  • Importance: High
  • Type: String
sftp.password

Password for the SFTP server connection. A password is not required if using a PEM file for TLS.

  • Importance: High
  • Type: Password
tls.pemfile

PEM file used for authentication using TLS. When using this property in the CLI, you must encode the binary PEM file in base64, take the encoded string, add the data:text/plain;base64 prefix, and then specify the entire string as the property entry. For example: "tls.pemfile" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...==".

  • Importance: High
  • Type: Password
tls.passphrase

Passphrase that is used to decrypt the private key if the given private key is encrypted.

  • Importance: High
  • Type: Password
output.data.format

Sets the utput Kafka record value format. The connector supports Avro, JSON Schema (JSON_SR), Protobuf, JSON (schemaless), Bytes, and String output Kafka record value formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). See Environment Limitations for additional information.

  • Importance: High
  • Type: String
  • Valid values: AVRO, JSON_SR, PROTOBUF, JSON, STRING, BYTES

Note

Note the following relationship between output.data.format and the input.file.parser.format property.

  • If you use BINARY for input.file.parser.format, you must use BYTES for output.data.format.
  • If you use SCHEMALESS_JSON for input.file.parser.format, you must use STRING for output.data.format.
  • If you leave this to JSON (the default) or use CSV for input.file.parser.format, you can use any format for output.data.format.
input.file.parser.format

The parser format used to parse fetched files from the SFTP directory. Defaults to JSON.

  • Importance: High
  • Type: String
  • Default value: JSON
  • Valid values: BINARY, CSV, JSON, SCHEMALESS_JSON

Important

If you use JSON (the default) or CSV as the input.file.parser.format, you must set the property schema.generation.enable to true. If you set this property to false, you must also provide a key.schema and a value.schema.

input.path

The directory where the connector reads files. This directory must exist and be writable by the connector.

  • Importance: High
  • Type: String
  • Valid value: Absolute path to an SFTP directory that exists and is writable.
finished.path

The directory where the connector puts files that are successfully processed. This directory must exist and be writable by the connector.

  • Importance: High
  • Type: String
  • Valid value: Absolute path to an SFTP directory that exists and is writable.
error.path

The directory to place files that have errors. This directory must exist and be writable by the connector.

  • Importance: High
  • Type: String
  • Valid value: Absolute path to a sftp directory that exists and is writable.
input.file.pattern

A regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches(). Using .* accepts all files in the directory.

  • Importance: High
  • Type: String
tasks.max

Number of tasks for the connector. The connector supports running one tasks per connector.

  • Importance: High
  • Type: Int
  • Valid values: 1
behavior.on.error

Sets whether the connector stops or continues when it encounters an error. FAIL stops the connector when any error occurs. IGNORE ignores the error and continues to the next file for processing.

  • Importance: High
  • Type: String
  • Default Value: FAIL
  • Valid values: FAIL, IGNORE
cleanup.policy

Determines how the connector should clean up files that the connector successfully processes. NONE leaves the files in place. If the connector is restarted, using NONE may cause files to be reprocessed. DELETE removes processed files from the filesystem. MOVE (the default) places processed files in the finished.path directory.

  • Importance: Medium
  • Type: String
  • Default value: MOVE
  • Valid values: NONE, DELETE, MOVE
file.minimum.age.ms

The amount of time in milliseconds after the file was last written to before the file can be processed. When set to 0 (the default), the connector processes all files irrespective of age.

  • Importance: Low
  • Type: Long
  • Default Value: 0
  • Valid values: [0,…]
batch.size

The number of records that should be returned in a batch. Defaults to 1000 records.

  • Importance: Low
  • Default Value: 1000
  • Type: Int
empty.poll.wait.ms

The amount of time to wait in milliseconds if a poll returns an empty list of records.

  • Importance: Low
  • Type: Long

JSON or CSV configuration properties

The following configuration properties are valid only for JSON or CSV file parser formats.

timestamp.mode

Determines how the connector sets a timestamp for the Kafka topic record. If set to FIELD, the timestamp is read from a field in the value. This field cannot be optional and must be a timestamp field. You specify the field using the timestamp.field field property. FILE_TIME sets the timestamp mode to the last time the file was modified. PROCESS_TIME sets the mode to when the connector reads the record.

  • Importance: Medium
  • Type: String
  • Valid values: FIELD, FILE_TIME, PROCESS_TIME
timestamp.field

The field in the value schema that contains the parsed timestamp for the record. This field cannot be marked as optional and must be timestamp field.

  • Importance: Medium
  • Type: String
parser.timestamp.timezone

The timezone used when parsing all dates.

  • Importance: Low
  • Type: String
parser.timestamp.date.formats

The date formats that are expected in the file. This is a list of strings used to parse the date fields. These formats are parsed in the order listed. The most accurate date format should be the first in the list. For more information, see SimpleDateFormat.

  • Importance: Low
  • Type: List
key.schema

The schema for the key the connector writes to Kafka. For example:

"key.schema" : "{\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}}"
  • Importance: High
  • Type: String
value.schema

The schema for the value the connector writes to Kafka. For example:

"value.schema" : "{\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}"
  • Importance: High
  • Type: String
schema.generation.enabled

Sets whether schemas should be dynamically generated. If set true, key.schema and value.schema can be omitted, but schema.generation.key.name and schema.generation.value.name must be set. If set false, key.schema and value.schema must be set.

  • Importance: Medium
  • Type: String

Important

If you use JSON (the default) or CSV as the input.file.parser.format, then you must add the property schema.generation.enable and set it to true. If you set this property to false, you must also provide a key.schema and a value.schema.

schema.generation.key.fields

The field (or comma-separated fields) to use to build a key schema. This is only used during schema generation.

  • Importance: Medium
  • Type: List
schema.generation.key.name

The name of the generated key schema.

  • Importance: Medium
  • Type: String
schema.generation.value.name

The name of the generated value schema.

  • Importance: Medium
  • Type: String

CSV configuration properties

The following configuration properties are valid only CSV input file parser format. These properties are exposed in the UI if you select CSV.

csv.skip.lines

Number of lines to skip at the beginning of the file. Defaults to 0, no lines skipped.

  • Importance: Low
  • Type: Int
  • Default value: 0
csv.separator.char

The character that separates each field in the form of an integer. The default is a comma character (44). For TSV, this defaults to a tab character (9). If csv.separator.char is defined as a null (value 0), then the RFC 4180 parcer is used by default.

  • Importance: Low
  • Type: Int
  • Default value: 44
csv.quote.char

The character that is used to quote a field. The default is a quote " character (value 34). This is used typically when the csv.separator.char character is within the data.

  • Importance: Low
  • Type: Int
  • Default value: 34
csv.escape.char

The character as an integer to use when a special character is encountered. The default escape character is a backslash \ character (value 92).

  • Importance: Low
  • Type: Int
  • Default value: 92
csv.strict.quotes

Sets whether or not strict quotes are enabled. If set to true, characters outside the quotes are ignored.

  • Importance: Low
  • Type: String
  • Default value: false
csv.ignore.leading.whitespace

Sets whether or not to ignore leading whitespace. If set to true, the white space in front of a quote in a field is ignored.

  • Importance: Low
  • Type: String
  • Default value: false
csv.ignore.quotations

Sets whether or not to ignore quotations mode. If set to true, quotations are ignored.

  • Importance: Low
  • Type: String
  • Default value: false
csv.keep.carriage.return

Sets whether or not to maintain a carriage return at the end of a line. If set to true, the carriage return is maintained.

  • Importance: Low
  • Type: String
  • Default value: false
csv.null.field.indicator

Sets the indicator that the CSV reader uses to determine if a field is null. Valid values are EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH, NEITHER. For more information, see Enum CSVRaderNullFieldIndicator.

  • Importance: Low
  • Type: String
  • Default value: NEITHER
csv.first.row.as.header

Sets whether the first row of data contains the header of the file. If set to true, the position of the columns are determined by the first row to the CSV. The column position is inferred from the position of the schema supplied in the value.schema property. If set to true, the number of columns must be greater than or equal to the number of fields in the schema.

  • Importance: Low
  • Type: String
  • Default value: False
csv.file.charset

Sets the character set used to read the file.

  • Importance: Low
  • Type: String
  • Default value: UTF-8

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png