Binary File Source Connector for Confluent Platform¶
The Binary File Source connector is used to read an entire file as a byte array
and write the data to Kafka. To use this connector, use a connector configuration
that specifies the name of the connector class in the connector.class
configuration property:
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirBinaryFileSourceConnector
Important
- Large files will be read as a single byte array. This means that the process could run out of memory or try to send a message to Kafka that is greater than the maximum message size. If this happens, the connector will throw an exception.
- The recommended converter to use is the ByteArrayConverter (for example,
value.converter=org.apache.kafka.connect.storage.ByteArrayConverter
)
Configuration Properties¶
The Binary File Source connector can be configured using a variety of configuration properties.
General¶
The following are general configuration properties.
batch.size¶
The number of records that should be returned with each batch.
- Importance: LOW
- Type: INT
- Default value: 1000
empty.poll.wait.ms¶
The amount of time, in milliseconds (ms), to wait if a poll returns an empty list of records.
- Importance: LOW
- Type: LONG
- Default value: 500
- Valid values: [1,…,9223372036854775807]
task.count¶
An internal setting to the connector used to instruct a task on which files to select. The connector will override this setting.
- Importance: LOW
- Type: INT
- Default value: 1
- Valid values: [1,…]
task.index¶
An internal setting used by the Spool Dir Connector to select which files are processed by each task. The connector will override this setting.
- Importance: LOW
- Type: INT
- Default value: 0
- Valid values: [0,…]
File System¶
Use the following properties to configure your file system.
error.path¶
The directory to place files of which has error(s). This directory must exist and be writable by the user running Kafka Connect.
- Importance: HIGH
- Type: STRING
- Valid values: Absolute path to a directory that exists and is writable.
input.file.pattern¶
Regular expression to check input filenames against. This expression must
match the entire filename. The equivalent of Matcher.matches()
.
- Importance: HIGH
- Type: STRING
input.path¶
The directory to read files from which will be processed. This directory must exist and be writable by the user running Kafka Connect.
- Importance: HIGH
- Type: STRING
- Valid values: Absolute path to a directory that exists and is writable.
finished.path¶
The directory to place files that have been processed. This directory must exist and be writable by the user running Kafka Connect .
- Importance: HIGH
- Type: STRING
halt.on.error¶
Determines whether the task should halt when it encounters an error or continue to the next file.
- Importance: HIGH
- Type: BOOLEAN
- Default value: true
cleanup.policy¶
Determines how the connector should cleanup the files that have been processed.
A value of NONE
leaves the files in place which causes them to be
reprocessed if the connector is restarted. DELETE
removes the file from the
filesystem. MOVE
will move the file to a finished directory. MOVEBYDATE
will
move the file to a finished directory with subdirectories by date
- Importance: MEDIUM
- Type: STRING
- Default value: MOVE
- Valid values:
NONE
,DELETE
,MOVE
,MOVEBYDATE
task.partitioner¶
The task partitioner implementation is used when the connector is configured to use more than one task. This is used by each task to identify which files will be processed by that task. This ensures that each file is only assigned to one task.
- Importance: MEDIUM
- Type: STRING
- Default value: ByName
- Valid values:
ByName
file.buffer.size.bytes¶
The size of buffer for the BufferedInputStream that will be used to interact with the file system.
- Importance: LOW
- Type: INT
- Default value: 131072
- Valid values: [1,…]
file.minimum.age.ms¶
The amount of time in milliseconds after the file was last written to before the file can be processed.
- Importance: LOW
- Type: LONG
- Default value: 0
- Valid values: [0,…]
files.sort.attributes¶
The attributes each file will use to determine the sort order. Name
is name of
the file. Length
is the length of the file preferring larger files first.
LastModified
is the LastModified attribute of the file preferring older files
first.
- Importance: LOW
- Type: LIST
- Default value: [NameAsc]
- Valid values:
NameAsc
,NameDesc
,LengthAsc
,LengthDesc
,LastModifiedAsc
,LastModifiedDesc
processing.file.extension¶
Before a file is processed, a flag is created in its directory to indicate the file is being handled. The flag file has the same name as the file, but with this property appended as a suffix.
- Importance: LOW
- Type: STRING
- Default value:
.PROCESSING
- Valid values: regex( ^.*..+$ )
Timestamps¶
Use the following properties for configuring the timestamp.
timestamp.mode¶
Determines how the connector will set the timestamp for the
[ConnectRecord](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/connector/ConnectRecord.html#timestamp()).
If set to Field
then the timestamp will be read from a field in the value.
This field cannot be optional and must be a
[Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html).
Specify the field in timestamp.field
. If set to FILE_TIME
then the last
modified time of the file will be used. If set to PROCESS_TIME
the time the
record is read will be used.
- Importance: MEDIUM
- Type: STRING
- Default value: PROCESS_TIME
- Valid values::
FIELD
,FILE_TIME
,PROCESS_TIME