Spool Dir Connectors for Confluent Platform

The Kafka Connect Spool Dir Source connectors watch a directory for files and read the data as new files are written to the input directory. Once a file has been read, it will be placed into the configured finished.path directory. Each record in the input file will be converted based on the user-supplied schema or an auto-generated schema.

The following connectors are included with the Connect Spool Dir Source Connector package:

Features

Each of the Spool Dir Source connectors include the following features:

At least once delivery

Each connector in the Spool Dir Source connector package guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Multiple tasks

Each connector in the Spool Dir Source connector package supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. Multiple tasks may improve performance when moving a large amount of data.

Warning

The Spool Dir Source connector may fail when running many tasks. This might occur if you use a regex in the input.file.pattern property that causes the connector to include .processing files–for example, "input.file.pattern"="SAMPLE.*"–in this way, the connector won’t exclude the files currently being processed and will output duplicate records and fail. To work around this issue, you must provide an extension in the input file format–for example, "input.file.pattern"="SAMPLE.*.txt". This modifies the regular expression to match only .txt files and allows the connector to process the correct files.

Install the Spool Dir Source Connector Package

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin installjcustenborder/kafka-connect-spooldir:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin installjcustenborder/kafka-connect-spooldir:1.0.31
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

The Spool Dir connector is an open source connector and does not require a Confluent Enterprise License.

Configuration Properties

For a complete list of configuration properties, see the specific connector documentation.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick Start

The following steps show the SpoolDirCsvSourceConnector loading a mock CSV file to a Kafka topic named spooldir-testing-topic. The other connectors are similar but load from different file types.

Prerequisites
  1. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent connect plugin installjcustenborder/kafka-connect-spooldir:latest
    

    Tip

    By default, the connector will install the plugin into the share/confluent-hub-components directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker.

  2. Start Confluent Platform using the Confluent CLI confluent local commands.

    confluent local services start
    
  3. Create a data directory and generate test data.

    mkdir data && curl "https://api.mockaroo.com/api/58605010?count=1000&key=25fd9c80" > "data/csv-spooldir-source.csv"
    
  4. Set up directories for files with errors and files that finished successfully.

    mkdir error && mkdir finished
    
  5. Create a spooldir.json file with the following contents:

    {
      "name": "CsvSpoolDir",
      "config": {
        "tasks.max": "1",
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "input.path": "/path/to/data",
        "input.file.pattern": "csv-spooldir-source.csv",
        "error.path": "/path/to/error",
        "finished.path": "/path/to/finished",
        "halt.on.error": "false",
        "topic": "spooldir-testing-topic",
        "csv.first.row.as.header": "true",
        "schema.generation.enabled": "true"
      }
    }
    
  6. Load the SpoolDir CSV Source connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load spooldir --config spooldir.json
    

    Important

    Don’t use the confluent local commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status spooldir
    
  8. Confirm that the messages are being sent to Kafka.

    kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 \
        --property schema.registry.url=http://localhost:8081 \
        --topic spooldir-testing-topic \
        --from-beginning | jq '.'
    
  9. Confirm that the source CSV file has been moved to the finished directory.