CSV Source Connector for Confluent Platform¶
The Kafka Connect CSV Source connector monitors the SFTP directory specified in input.path
for files and reads them as CSVs, converting each of the records to the strongly typed equivalent specified in key.schema
and value.schema
. The connector can also auto generate the key.schema
and value.schema
at run time if schema.generation.enabled
is true.
To use this connector, specify the name of the connector class in the connector.class
configuration property.
connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
Limitations¶
The CSV Source connector will only authenticate with Kerberos when a keytab is cached on the system–not when the keytab is provided in the configs. Additionally, the Connector will only work with Kerberos on Confluent Platform 5.5.4 or later.
CSV Source Connector Examples¶
The following examples follow the same steps as the Quick Start for installing Confluent Platform and the SFTP connector package. Review the Quick Start for help running the Confluent Platform and installing the SFTP connector package.
CSV with Schema Example¶
This example reads CSV files and writes them to Kafka. It parses them using the schema specified in key.schema
and value.schema
.
Set up an SFTP data directory for files to process, generate test data in your local and push it to the SFTP server. For example:
echo $'1,Salmon,Baitman,sbaitman0@feedburner.com,Male,120.181.75.98,2015-03-01T06:01:15Z,17462.66,IT,#f09bc0\n2,Debby,Brea,dbrea1@icio.us,Female,153.239.187.49,2018-10-21T12:27:12Z,14693.49,CZ,#73893a' > "csv-sftp-source.csv"
Create an
sftp.properties
file with the following contents:name=CsvSchemaSftp kafka.topic=sftp-testing-topic tasks.max=1 connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector cleanup.policy=NONE behavior.on.error=IGNORE input.path=/path/to/data error.path=/path/to/error finished.path=/path/to/finished input.file.pattern=csv-sftp-source.csv sftp.username=username sftp.password=password sftp.host=localhost sftp.port=22 csv.first.row.as.header=false key.schema={\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}} value.schema={\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
Load the SFTP CSV Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load CsvSchemaSftp --config sftp.properties
Important
Don’t use the Confluent CLI in production environments.
Validate messages are sent to Kafka serialized with Avro.
kafka-avro-console-consumer --topic sftp-testing-topic --from-beginning --bootstrap-server localhost:9092
TSV Input File Example¶
The following example loads a TSV file and produces each record to Kafka. TSV Source connector works just like the CSV Source connector, except that you have to provide the extra configuration property csv.separator.char=9
. The number 9 is the ASCII value for tab space.
Generate a TSV data set using the command below:
echo $'id\tfirst_name\tlast_name\temail\tgender\tip_address\tlast_login\taccount_balance\tcountry\tfavorite_color\n1\tPadraig\tOxshott\tpoxshott0@dion.ne.jp\tMale\t47.243.121.95\t2016-06-24T22:43:42Z\t15274.22\tJP\t#06708f\n2\tEdi\tOrrah\teorrah1@cafepress.com\tFemale\t158.229.234.101\t2017-03-01T17:52:47Z\t12947.6\tCN\t#5f2aa2' > "tsv-sftp-source.tsv"
Create an
sftp.properties
file with the following contents:name=TsvSftp tasks.max=1 connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector input.path=/path/to/data error.path=/path/to/error finished.path=/path/to/finished cleanup.policy=NONE input.file.pattern=tsv-sftp-source.tsv behavior.on.error=IGNORE sftp.username=username sftp.password=password sftp.host=localhost sftp.port=22 kafka.topic=sftp-tsv-topic schema.generation.enabled=true csv.first.row.as.header=true csv.separator.char=9
Load the SFTP TSV Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load TsvSftp --config sftp.properties
Important
Don’t use the Confluent CLI in production environments.
Key-Based TLS Authentication Example¶
The following connector configuration example shows the configuration properties used when the connector requires TLS private/public key authentication:
name=sftp-source-connector
topics=sftptopic
tasks.max=1
connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
tls.private.key = <tls-private-key>
tls.public.key = <tls-public-key>
#tls.passphrase=<uncomment-if-required>
sftp.username=<sftp-username>
confluent.topic.bootstrap.servers=localhost:9092
sftp.host=192.168.2.5
sftp.port=22
flush.size=3
format.class=io.confluent.connect.avro.AvroConverter
sftp.working.dir= /home/<path-to-files/
Before configuring the tls.private.key
property, see the example
shown in the Security section of the
SFTP Source Connector Configuration Properties.
Configuration Properties¶
For connector-specific configuration properties, see SFTP Source Connector Configuration Properties