JSON Source Connector for Confluent Platform¶
The Kafka Connect JSON Source connector is used to stream JSON files from an SFTP directory while also converting the data based on the schema supplied in the configuration. If the schema is not supplied in the configuration then the connector can also auto generate the key.schema
and value.schema
at run time when schema.generation.enabled
is true.
To use this connector, use a connector configuration that specifies the name of this connector class in the connector.class
configuration property, as shown below:
connector.class=io.confluent.connect.sftp.SftpJsonSourceConnector
JSON Source Connector Example¶
This example follows the same steps as the Quick Start. Review the Quick Start for help running the Confluent Platform and installing the SFTP connector package.
Generate a JSON dataset using the command below and push it to SFTP server:
echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-sftp-source.json"
Create an
sftp.properties
file with the following contents:name=JsonSftp tasks.max=1 connector.class=io.confluent.connect.sftp.SftpJsonSourceConnector input.path=/path/to/data error.path=/path/to/error finished.path=/path/to/finished cleanup.policy=NONE input.file.pattern=json-sftp-source.json behavior.on.error=IGNORE sftp.username=username sftp.password=password sftp.host=localhost sftp.port=22 kafka.topic=sftp-json-topic key.schema={\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}} value.schema={\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
Load the SFTP JSON Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load JsonSftp --config sftp.properties
Important
Don’t use the Confluent CLI in production environments.
Configuration Properties¶
For connector-specific configuration properties, see SFTP Source Connector Configuration Properties.