JSON Source Connector for Confluent Platform

The Kafka Connect JSON Source connector is used to stream JSON files from an SFTP directory while also converting the data based on the schema supplied in the configuration. If the schema is not supplied in the configuration then the connector can also auto generate the key.schema and value.schema at run time when schema.generation.enabled is true.

To use this connector, use a connector configuration that specifies the name of this connector class in the connector.class configuration property, as shown below:

connector.class=io.confluent.connect.sftp.SftpJsonSourceConnector

JSON Source Connector Example

This example follows the same steps as the Quick Start. Review the Quick Start for help running the Confluent Platform and installing the SFTP connector package.

  1. Generate a JSON dataset using the command below and push it to SFTP server:

    echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-sftp-source.json"
    
  2. Create an sftp.properties file with the following contents:

    name=JsonSftp
    tasks.max=1
    connector.class=io.confluent.connect.sftp.SftpJsonSourceConnector
    input.path=/path/to/data
    error.path=/path/to/error
    finished.path=/path/to/finished
    cleanup.policy=NONE
    input.file.pattern=json-sftp-source.json
    behavior.on.error=IGNORE
    sftp.username=username
    sftp.password=password
    sftp.host=localhost
    sftp.port=22
    kafka.topic=sftp-json-topic
    key.schema={\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false}}}
    value.schema={\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
    
  3. Load the SFTP JSON Source connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load JsonSftp --config sftp.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

Configuration Properties

For connector-specific configuration properties, see SFTP Source Connector Configuration Properties.