Binary File Source Connector for Confluent Platform
The Binary File Source connector is used to read an entire file as a byte array
and write the data to Kafka. To use this connector, use a connector configuration
that specifies the name of the connector class in the connector.class
configuration property:
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirBinaryFileSourceConnector
Important
Large files will be read as a single byte array. This means that the process could run out of memory or try to send a message to Kafka that is greater than the maximum message size. If this happens, the connector will throw an exception.
The recommended converter to use is the ByteArrayConverter (for example,
value.converter=org.apache.kafka.connect.storage.ByteArrayConverter
)
Configuration Properties
The Binary File Source connector can be configured using a variety of configuration properties.
General
The following are general configuration properties.
topic
The Kafka topic to write the data to.
Importance: HIGH
Type: STRING
batch.size
The number of records that should be returned with each batch.
Importance: LOW
Type: INT
Default value: 1000
empty.poll.wait.ms
The amount of time, in milliseconds (ms), to wait if a poll returns an empty list of records.
Importance: LOW
Type: LONG
Default value: 500
Valid values: [1,…,9223372036854775807]
task.count
An internal setting to the connector used to instruct a task on which files to select. The connector will override this setting.
Importance: LOW
Type: INT
Default value: 1
Valid values: [1,…]
task.index
An internal setting used by the Spool Dir Connector to select which files are processed by each task. The connector will override this setting.
Importance: LOW
Type: INT
Default value: 0
Valid values: [0,…]
File System
Use the following properties to configure your file system.
error.path
The directory to place files of which has error(s). This directory must exist and be writable by the user running Kafka Connect.
Importance: HIGH
Type: STRING
Valid values: Absolute path to a directory that exists and is writable.
input.file.pattern
Regular expression to check input filenames against. This expression must
match the entire filename. The equivalent of Matcher.matches()
.
Importance: HIGH
Type: STRING
input.path
The directory to read files from which will be processed. This directory must exist and be writable by the user running Kafka Connect.
Importance: HIGH
Type: STRING
Valid values: Absolute path to a directory that exists and is writable.
finished.path
The directory to place files that have been processed. This directory must exist and be writable by the user running Kafka Connect .
Importance: HIGH
Type: STRING
halt.on.error
Determines whether the task should halt when it encounters an error or continue to the next file.
Importance: HIGH
Type: BOOLEAN
Default value: true
cleanup.policy
Determines how the connector should cleanup the files that have been processed.
A value of NONE
leaves the files in place which causes them to be
reprocessed if the connector is restarted. DELETE
removes the file from the
filesystem. MOVE
will move the file to a finished directory. MOVEBYDATE
will
move the file to a finished directory with subdirectories by date
Importance: MEDIUM
Type: STRING
Default value: MOVE
Valid values:
NONE
,DELETE
,MOVE
,MOVEBYDATE
task.partitioner
The task partitioner implementation is used when the connector is configured to use more than one task. This is used by each task to identify which files will be processed by that task. This ensures that each file is only assigned to one task.
Importance: MEDIUM
Type: STRING
Default value: ByName
Valid values:
ByName
file.buffer.size.bytes
The size of buffer for the BufferedInputStream that will be used to interact with the file system.
Importance: LOW
Type: INT
Default value: 131072
Valid values: [1,…]
file.minimum.age.ms
The amount of time in milliseconds after the file was last written to before the file can be processed.
Importance: LOW
Type: LONG
Default value: 0
Valid values: [0,…]
files.sort.attributes
The attributes each file will use to determine the sort order. Name
is name of
the file. Length
is the length of the file preferring larger files first.
LastModified
is the LastModified attribute of the file preferring older files
first.
Importance: LOW
Type: LIST
Default value: [NameAsc]
Valid values:
NameAsc
,NameDesc
,LengthAsc
,LengthDesc
,LastModifiedAsc
,LastModifiedDesc
processing.file.extension
Before a file is processed, a flag is created in its directory to indicate the file is being handled. The flag file has the same name as the file, but with this property appended as a suffix.
Importance: LOW
Type: STRING
Default value:
.PROCESSING
Valid values: regex( ^.*..+$ )
Timestamps
Use the following properties for configuring the timestamp.
timestamp.mode
Determines how the connector will set the timestamp for the
[ConnectRecord](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/connector/ConnectRecord.html#timestamp()).
If set to Field
then the timestamp will be read from a field in the value.
This field cannot be optional and must be a
[Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html).
Specify the field in timestamp.field
. If set to FILE_TIME
then the last
modified time of the file will be used. If set to PROCESS_TIME
the time the
record is read will be used.
Importance: MEDIUM
Type: STRING
Default value: PROCESS_TIME
Valid values::
FIELD
,FILE_TIME
,PROCESS_TIME