Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Schemaless JSON Source Connector for Confluent Platform

Kafka Connect Schemaless JSON Source connector is used to stream JSON files from an SFTP directory. It will not try to convert the JSON records to a schema. The recommended converter to use is the StringConverter.

value.converter=org.apache.kafka.connect.storage.StringConverter

To use this connector, use a connector configuration that specifies the name of this connector class in the connector.class configuration property as shown below:

connector.class=io.confluent.connect.sftp.SftpSchemaLessJsonSourceConnector

JSON Schemaless Source Connector Example

This example follows the same steps as the Quick Start. Review the Quick Start for help running the Confluent Platform and installing the SFTP connector package.

Prerequisites
  1. Generate a JSON dataset using the command below:

    echo $'{"id":1,"first_name":"Roscoe","last_name":"Brentnall","email":"rbrentnall0@mediafire.com","gender":"Male","ip_address":"202.84.142.254","last_login":"2018-02-12T06:26:23Z","account_balance":1450.68,"country":"CZ","favorite_color":"#4eaefa"}\n{"id":2,"first_name":"Gregoire","last_name":"Fentem","email":"gfentem1@nsw.gov.au","gender":"Male","ip_address":"221.159.106.63","last_login":"2015-03-27T00:29:56Z","account_balance":1392.37,"country":"ID","favorite_color":"#e8f686"}' > "json-sftp-source.json"
    
  2. Create an sftp.properties file with the following contents:

    name=SchemaLessJsonSftp
    tasks.max=1
    connector.class=io.confluent.connect.sftp.SftpSchemaLessJsonSourceConnector
    input.path=/path/to/data
    error.path=/path/to/error
    finished.path=/path/to/finished
    input.file.pattern=json-sftp-source.json
    behavior.on.error=IGNORE
    sftp.username=username
    sftp.password=password
    sftp.host=localhost
    sftp.port=22
    kafka.topic=sftp-schemaless-json-topic
    value.converter=org.apache.kafka.connect.storage.StringConverter
    
  3. Load the SFTP Schemaless JSON Source Connector using the Confluent CLI confluent local load command.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local load SchemaLessJsonSftp -- -d sftp.properties
    

    Important

    The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Install and Upgrade Confluent Platform.

Configuration Properties

Connector-specific configuration properties are described below.

General

kafka.topic

The Kafka topic to write the data to.

  • Importance: HIGH
  • Type: STRING
batch.size

The number of records that should be returned with each batch.

  • Importance: LOW
  • Type: INT
  • Default Value: 1000
empty.poll.wait.ms

The amount of time to wait if a poll returns an empty list of records.

  • Importance: LOW
  • Type: LONG
  • Default Value: 250
  • Validator: [1,…,9223372036854775807]

Connection

sftp.host

Sftp host to connect with.

  • Type: string
  • Default: localhost
  • Importance: high
sftp.port

Port number of SFTP server.

  • Type: int
  • Default: 22
  • Importance: medium
sftp.username

Username for sftp server.

  • Type: string
  • Default: foo
  • Importance: high
sftp.password

Password for sftp server.

  • Type: string
  • Default: pass
  • Importance: high

Security

tls.private.key

Private key that will be used for public-key authentication.

  • Type: password
  • Default: [hidden]
  • Importance: low
tls.public.key

Public key that will be used to decrypt the private key if the given private key is encrypted.

  • Type: password
  • Default: [hidden]
  • Importance: low
tls.passphrase

Passphrase that will be used to decrypt the private key if the given private key is encrypted.

  • Type: password
  • Default: [hidden]
  • Importance: low
tls.pemfile

Path to pemfile.

  • Type: string
  • Default: “”
  • Importance: low

Proxy

sftp.proxy.url

Proxy url for sftp connection.

  • Type: string
  • Default: “”
  • Importance: low
proxy.username

Proxy username for sftp server if proxy is being used.

  • Type: string
  • Default: null
  • Importance: low
proxy.password

Proxy password for sftp server if proxy is being used.

  • Type: string
  • Default: null
  • Importance: low

File System

input.path

The directory where Kafka Connect reads files that are processed. This directory must exist and be writable by the user running Connect.

  • Importance: HIGH
  • Type: STRING
  • Validator: Absolute path to a sftp directory that exists and is writable.
input.file.pattern

Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches().

  • Importance: HIGH
  • Type: STRING
finished.path

The directory where Connect puts files that are successfully processed. This directory must exist and be writable by the user running Connect.

  • Importance: HIGH
  • Type: STRING
  • Validator: Absolute path to a sftp directory that exists and is writable.
error.path

The directory to place files that have errors. This directory must exist and be writable by the user running Kafka Connect.

  • Importance: HIGH
  • Type: STRING
  • Validator: Absolute path to a sftp directory that exists and is writable.
behavior.on.error

Sets how the connector should behave when error are encountered while processing records. FAIL stops the connector when any error occurs. IGNORE ignores the current file and continues to the next file for processing. LOG after logging the error message, it continues to the next file for processing.

  • Importance: HIGH
  • Type: STRING
  • Default Value: FAIL
  • Validator: Matches: FAIL, IGNORE, LOG
cleanup.policy

Sets how the connector should clean up files that are successfully processed. NONE leaves the files in place. Files left in place may be reprocessed if the connector is restarted. DELETE removes the file from the filesystem. MOVE (the default) moves the file to the finished.path directory.

  • Importance: MEDIUM
  • Type: STRING
  • Default Value: MOVE
  • Validator: Matches: NONE, DELETE, MOVE
file.minimum.age.ms

The amount of time in milliseconds after the file was last written to before the file can be processed.

  • Importance: LOW
  • Type: LONG
  • Default Value: 0
  • Validator: [0,…]
processing.file.extension

Before a file is processed, it is renamed to indicate that it is currently being processed. This setting is appended to the end of the file.

  • Importance: LOW
  • Type: STRING
  • Default Value: .PROCESSING
  • Validator: Matches regex( ^.*..+$ )

Timestamps

timestamp.mode

Determines how the connector sets the timestamp for the ConnectRecord. If set to Field, the timestamp is read from a field in the value. This field cannot be optional and must be a Timestamp. Specify the field in timestamp.field. If set to FILE_TIME, the last time the file was modified is used. If set to PROCESS_TIME (the default), the time the record is read is used.

  • Importance: MEDIUM
  • Type: STRING
  • Default Value: PROCESS_TIME
  • Validator: Matches: FIELD, FILE_TIME, PROCESS_TIME