CSV Source Connector for Confluent Platform

The Kafka Connect CSV Source connector monitors the SFTP directory specified in input.path for files and reads them as CSVs, converting each of the records to the strongly typed equivalent specified in key.schema and value.schema. The connector can also auto generate the key.schema and value.schema at run time if schema.generation.enabled is true.

To use this connector, specify the name of the connector class in the connector.class configuration property.

connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector

Limitations

The CSV Source connector will only authenticate with Kerberos when a keytab is cached on the system–not when the keytab is provided in the configs. Additionally, the Connector will only work with Kerberos on Confluent Platform 5.5.4 or later.

CSV Source Connector Examples

The following examples follow the same steps as the Quick Start for installing Confluent Platform and the SFTP connector package. Review the Quick Start for help running the Confluent Platform and installing the SFTP connector package.

CSV with Schema Example

This example reads CSV files and writes them to Kafka. It parses them using the schema specified in key.schema and value.schema.

  1. Set up an SFTP data directory for files to process, generate test data in your local and push it to the SFTP server. For example:

    echo $'1,Salmon,Baitman,sbaitman0@feedburner.com,Male,120.181.75.98,2015-03-01T06:01:15Z,17462.66,IT,#f09bc0\n2,Debby,Brea,dbrea1@icio.us,Female,153.239.187.49,2018-10-21T12:27:12Z,14693.49,CZ,#73893a' > "csv-sftp-source.csv"
    
  2. Create an sftp.properties file with the following contents:

    name=CsvSchemaSftp
    kafka.topic=sftp-testing-topic
    tasks.max=1
    connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
    cleanup.policy=NONE
    behavior.on.error=IGNORE
    input.path=/path/to/data
    error.path=/path/to/error
    finished.path=/path/to/finished
    input.file.pattern=csv-sftp-source.csv
    sftp.username=username
    sftp.password=password
    sftp.host=localhost
    sftp.port=22
    csv.first.row.as.header=false
    key.schema={\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}}
    value.schema={\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
    
  3. Load the SFTP CSV Source connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load CsvSchemaSftp --config sftp.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

  4. Validate messages are sent to Kafka serialized with Avro.

    kafka-avro-console-consumer --topic sftp-testing-topic --from-beginning --bootstrap-server localhost:9092
    

TSV Input File Example

The following example loads a TSV file and produces each record to Kafka. TSV Source connector works just like the CSV Source connector, except that you have to provide the extra configuration property csv.separator.char=9. The number 9 is the ASCII value for tab space.

  1. Generate a TSV data set using the command below:

    echo $'id\tfirst_name\tlast_name\temail\tgender\tip_address\tlast_login\taccount_balance\tcountry\tfavorite_color\n1\tPadraig\tOxshott\tpoxshott0@dion.ne.jp\tMale\t47.243.121.95\t2016-06-24T22:43:42Z\t15274.22\tJP\t#06708f\n2\tEdi\tOrrah\teorrah1@cafepress.com\tFemale\t158.229.234.101\t2017-03-01T17:52:47Z\t12947.6\tCN\t#5f2aa2' > "tsv-sftp-source.tsv"
    
  2. Create an sftp.properties file with the following contents:

    name=TsvSftp
    tasks.max=1
    connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
    input.path=/path/to/data
    error.path=/path/to/error
    finished.path=/path/to/finished
    cleanup.policy=NONE
    input.file.pattern=tsv-sftp-source.tsv
    behavior.on.error=IGNORE
    sftp.username=username
    sftp.password=password
    sftp.host=localhost
    sftp.port=22
    kafka.topic=sftp-tsv-topic
    schema.generation.enabled=true
    csv.first.row.as.header=true
    csv.separator.char=9
    
  3. Load the SFTP TSV Source connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load TsvSftp --config sftp.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

Key-Based TLS Authentication Example

The following connector configuration example shows the configuration properties used when the connector requires TLS private/public key authentication:

name=sftp-source-connector
topics=sftptopic
tasks.max=1
connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
tls.private.key = <tls-private-key>
tls.public.key = <tls-public-key>
#tls.passphrase=<uncomment-if-required>
sftp.username=<sftp-username>
confluent.topic.bootstrap.servers=localhost:9092
sftp.host=192.168.2.5
sftp.port=22
flush.size=3
format.class=io.confluent.connect.avro.AvroConverter
sftp.working.dir= /home/<path-to-files/

Before configuring the tls.private.key property, see the example shown in the Security section of the SFTP Source Connector Configuration Properties.

Configuration Properties

For connector-specific configuration properties, see SFTP Source Connector Configuration Properties