Binary File Source Connector for Confluent Platform

The Binary File Source connector is used to read an entire file as a byte array and write the data to Kafka. To use this connector, use a connector configuration that specifies the name of the connector class in the connector.class configuration property:

connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirBinaryFileSourceConnector

Important

  • Large files will be read as a single byte array. This means that the process could run out of memory or try to send a message to Kafka that is greater than the maximum message size. If this happens, the connector will throw an exception.
  • The recommended converter to use is the ByteArrayConverter (for example, value.converter=org.apache.kafka.connect.storage.ByteArrayConverter)

Binary File Source Connector Example

This example will use the FromXml transformation to read the binary data based on the supplied XSD. This allows files to be converted to strongly-typed data based on the XSD. Once Kafka Connect has converted the data it can be stored as AVRO, JSON, or whatever converter the user chooses.

Configuration
{
  "connector.class" : "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirBinaryFileSourceConnector",
  "finished.path" : "/tmp",
  "input.path" : "/tmp",
  "error.path" : "/tmp",
  "input.file.pattern" : "^users\\d+\\.bin$",
  "topic" : "users",
  "transforms" : "FromXml",
  "transforms.FromXml.type" : "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
  "transforms.FromXml.schema.path" : "file:///books.xsd"
}

Configuration Properties

The Binary File Source connector can be configured using a variety of configuration properties.

General

The following are general configuration properties.

topic

The Kafka topic to write the data to.

  • Importance: HIGH
  • Type: STRING

batch.size

The number of records that should be returned with each batch.

  • Importance: LOW
  • Type: INT
  • Default value: 1000

empty.poll.wait.ms

The amount of time, in milliseconds (ms), to wait if a poll returns an empty list of records.

  • Importance: LOW
  • Type: LONG
  • Default value: 500
  • Valid values: [1,…,9223372036854775807]

task.count

An internal setting to the connector used to instruct a task on which files to select. The connector will override this setting.

  • Importance: LOW
  • Type: INT
  • Default value: 1
  • Valid values: [1,…]

task.index

An internal setting used by the Spool Dir Connector to select which files are processed by each task. The connector will override this setting.

  • Importance: LOW
  • Type: INT
  • Default value: 0
  • Valid values: [0,…]

File System

Use the following properties to configure your file system.

error.path

The directory to place files of which has error(s). This directory must exist and be writable by the user running Kafka Connect.

  • Importance: HIGH
  • Type: STRING
  • Valid values: Absolute path to a directory that exists and is writable.

input.file.pattern

Regular expression to check input filenames against. This expression must match the entire filename. The equivalent of Matcher.matches().

  • Importance: HIGH
  • Type: STRING

input.path

The directory to read files from which will be processed. This directory must exist and be writable by the user running Kafka Connect.

  • Importance: HIGH
  • Type: STRING
  • Valid values: Absolute path to a directory that exists and is writable.

finished.path

The directory to place files that have been processed. This directory must exist and be writable by the user running Kafka Connect .

  • Importance: HIGH
  • Type: STRING

halt.on.error

Determines whether the task should halt when it encounters an error or continue to the next file.

  • Importance: HIGH
  • Type: BOOLEAN
  • Default value: true

cleanup.policy

Determines how the connector should cleanup the files that have been processed. A value of NONE leaves the files in place which causes them to be reprocessed if the connector is restarted. DELETE removes the file from the filesystem. MOVE will move the file to a finished directory. MOVEBYDATE will move the file to a finished directory with subdirectories by date

  • Importance: MEDIUM
  • Type: STRING
  • Default value: MOVE
  • Valid values: NONE, DELETE, MOVE, MOVEBYDATE

task.partitioner

The task partitioner implementation is used when the connector is configured to use more than one task. This is used by each task to identify which files will be processed by that task. This ensures that each file is only assigned to one task.

  • Importance: MEDIUM
  • Type: STRING
  • Default value: ByName
  • Valid values: ByName

file.buffer.size.bytes

The size of buffer for the BufferedInputStream that will be used to interact with the file system.

  • Importance: LOW
  • Type: INT
  • Default value: 131072
  • Valid values: [1,…]

file.minimum.age.ms

The amount of time in milliseconds after the file was last written to before the file can be processed.

  • Importance: LOW
  • Type: LONG
  • Default value: 0
  • Valid values: [0,…]

files.sort.attributes

The attributes each file will use to determine the sort order. Name is name of the file. Length is the length of the file preferring larger files first. LastModified is the LastModified attribute of the file preferring older files first.

  • Importance: LOW
  • Type: LIST
  • Default value: [NameAsc]
  • Valid values: NameAsc, NameDesc, LengthAsc, LengthDesc, LastModifiedAsc, LastModifiedDesc

processing.file.extension

Before a file is processed, a flag is created in its directory to indicate the file is being handled. The flag file has the same name as the file, but with this property appended as a suffix.

  • Importance: LOW
  • Type: STRING
  • Default value: .PROCESSING
  • Valid values: regex( ^.*..+$ )

Timestamps

Use the following properties for configuring the timestamp.

timestamp.mode

Determines how the connector will set the timestamp for the [ConnectRecord](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/connector/ConnectRecord.html#timestamp()). If set to Field then the timestamp will be read from a field in the value. This field cannot be optional and must be a [Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html). Specify the field in timestamp.field. If set to FILE_TIME then the last modified time of the file will be used. If set to PROCESS_TIME the time the record is read will be used.

  • Importance: MEDIUM
  • Type: STRING
  • Default value: PROCESS_TIME
  • Valid values:: FIELD, FILE_TIME, PROCESS_TIME