Spool Dir Connectors for Confluent Platform¶
The Kafka Connect Spool Dir Source connectors watch a directory for files and
read the data as new files are written to the input directory. Once a file has
been read, it will be placed into the configured finished.path
directory.
Each record in the input file will be converted based on the user-supplied
schema or an auto-generated schema.
The following connectors are included with the Connect Spool Dir Source Connector package:
- CSV Source Connector for Confluent Platform
- JSON Source Connector for Confluent Platform
- Schemaless JSON Source Connector for Confluent Platform
- Line-Delimited Source Connector for Confluent Platform
- Extended Log File Format Source Connector for Confluent Platform
- Binary File Source Connector for Confluent Platform
Features¶
Each of the Spool Dir Source connectors include the following features:
At least once delivery¶
Each connector in the Spool Dir Source connector package guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Multiple tasks¶
Each connector in the Spool Dir Source connector package supports running one or
more tasks. You can specify the number of tasks in the tasks.max
configuration parameter. Multiple tasks may improve performance when moving a
large amount of data.
Warning
The Spool Dir Source connector may fail when running many tasks. This might
occur if you use a regex in the input.file.pattern
property that causes
the connector to include .processing
files–for example,
"input.file.pattern"="SAMPLE.*"
–in this way, the connector won’t
exclude the files currently being processed and will output duplicate records
and fail. To work around this issue, you must provide an extension in
the input file format–for example, "input.file.pattern"="SAMPLE.*.txt"
.
This modifies the regular expression to match only .txt
files and allows
the connector to process the correct files.
Install the Spool Dir Source Connector Package¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin installjcustenborder/kafka-connect-spooldir:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin installjcustenborder/kafka-connect-spooldir:1.0.31
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
The Spool Dir connector is an open source connector and does not require a Confluent Enterprise License.
Configuration Properties¶
For a complete list of configuration properties, see the specific connector documentation.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick Start¶
The following steps show the SpoolDirCsvSourceConnector
loading a mock CSV file to a Kafka topic named spooldir-testing-topic
. The other connectors are similar but load from different file types.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin installjcustenborder/kafka-connect-spooldir:latest
Tip
By default, the connector will install the plugin into the
share/confluent-hub-components
directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker.Start Confluent Platform using the Confluent CLI confluent local commands.
confluent local services start
Create a data directory and generate test data.
mkdir data && curl "https://api.mockaroo.com/api/58605010?count=1000&key=25fd9c80" > "data/csv-spooldir-source.csv"
Set up directories for files with errors and files that finished successfully.
mkdir error && mkdir finished
Create a
spooldir.json
file with the following contents:{ "name": "CsvSpoolDir", "config": { "tasks.max": "1", "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "input.path": "/path/to/data", "input.file.pattern": "csv-spooldir-source.csv", "error.path": "/path/to/error", "finished.path": "/path/to/finished", "halt.on.error": "false", "topic": "spooldir-testing-topic", "csv.first.row.as.header": "true", "schema.generation.enabled": "true" } }
Load the SpoolDir CSV Source connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load spooldir --config spooldir.json
Important
Don’t use the confluent local commands in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status spooldir
Confirm that the messages are being sent to Kafka.
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic spooldir-testing-topic \ --from-beginning | jq '.'
Confirm that the source CSV file has been moved to the
finished
directory.