Line-Delimited Source Connector for Confluent Platform

This connector is used to read a file line by line and write the data to Apache Kafka®.

To use this connector, use a connector configuration that specifies the name of this connector class in the connector.class configuration property:

connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector

Important

The recommended converter to use is the StringConverter. Example: value.converter=org.apache.kafka.connect.storage.StringConverter

The other connector-specific configuration properties are described below.

FIX Encoded Lines Example

This example reads files in a directory line by line and parse them using kafka-connect-transform-fix to a FIX representation of the data.

Configuration
{
  "connector.class" : "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector",
  "topic" : "fix",
  "input.path" : "/tmp",
  "input.file.pattern" : "^.+\\.fix$",
  "error.path" : "/tmp",
  "finished.path" : "/tmp",
  "transforms" : "fromFix",
  "transforms.fromFix.type" : "com.github.jcustenborder.kafka.connect.transform.fix.FromFIX$Value"
}

Data similar to the following will be emitted by this connector.

Output
{
  "sourcePartition" : { },
  "sourceOffset" : { },
  "topic" : "fix",
  "kafkaPartition" : 0,
  "valueSchema" : {
    "name" : "fix42.NewOrderSingle",
    "type" : "STRUCT",
    "isOptional" : false,
    "fieldSchemas" : {
      "Account" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "1"
        },
        "isOptional" : true
      },
      "CashOrderQty" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "152"
        },
        "isOptional" : true
      },
      "CheckSum" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "10"
        },
        "isOptional" : true
      },
      "ClOrdID" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "11"
        },
        "isOptional" : true
      },
      "ClearingAccount" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "440"
        },
        "isOptional" : true
      },
      "ClearingFirm" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "439"
        },
        "isOptional" : true
      },
      "ClientID" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "109"
        },
        "isOptional" : true
      },
      "CommType" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "13"
        },
        "isOptional" : true
      },
      "Commission" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "12"
        },
        "isOptional" : true
      },
      "ComplianceID" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "376"
        },
        "isOptional" : true
      },
      "ContractMultiplier" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "231"
        },
        "isOptional" : true
      },
      "CouponRate" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "223"
        },
        "isOptional" : true
      },
      "CoveredOrUncovered" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "203"
        },
        "isOptional" : true
      },
      "Currency" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "15"
        },
        "isOptional" : true
      },
      "CustomerOrFirm" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "204"
        },
        "isOptional" : true
      },
      "DiscretionInst" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "388"
        },
        "isOptional" : true
      },
      "DiscretionOffset" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "389"
        },
        "isOptional" : true
      },
      "EffectiveTime" : {
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "type" : "INT64",
        "version" : 1,
        "parameters" : {
          "fix.field" : "168"
        },
        "isOptional" : true
      },
      "EncodedIssuer" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "349"
        },
        "isOptional" : true
      },
      "EncodedIssuerLen" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "348"
        },
        "isOptional" : true
      },
      "EncodedSecurityDesc" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "351"
        },
        "isOptional" : true
      },
      "EncodedSecurityDescLen" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "350"
        },
        "isOptional" : true
      },
      "EncodedText" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "355"
        },
        "isOptional" : true
      },
      "EncodedTextLen" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "354"
        },
        "isOptional" : true
      },
      "ExDestination" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "100"
        },
        "isOptional" : true
      },
      "ExecBroker" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "76"
        },
        "isOptional" : true
      },
      "ExecInst" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "18"
        },
        "isOptional" : true
      },
      "ExpireDate" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "432"
        },
        "isOptional" : true
      },
      "ExpireTime" : {
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "type" : "INT64",
        "version" : 1,
        "parameters" : {
          "fix.field" : "126"
        },
        "isOptional" : true
      },
      "ForexReq" : {
        "type" : "BOOLEAN",
        "parameters" : {
          "fix.field" : "121"
        },
        "isOptional" : true
      },
      "FutSettDate" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "64"
        },
        "isOptional" : true
      },
      "FutSettDate2" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "193"
        },
        "isOptional" : true
      },
      "GTBookingInst" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "427"
        },
        "isOptional" : true
      },
      "HandlInst" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "21"
        },
        "isOptional" : true
      },
      "IDSource" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "22"
        },
        "isOptional" : true
      },
      "IOIID" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "23"
        },
        "isOptional" : true
      },
      "Issuer" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "106"
        },
        "isOptional" : true
      },
      "LocateReqd" : {
        "type" : "BOOLEAN",
        "parameters" : {
          "fix.field" : "114"
        },
        "isOptional" : true
      },
      "MaturityDay" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "205"
        },
        "isOptional" : true
      },
      "MaturityMonthYear" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "200"
        },
        "isOptional" : true
      },
      "MaxFloor" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "111"
        },
        "isOptional" : true
      },
      "MaxShow" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "210"
        },
        "isOptional" : true
      },
      "MinQty" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "110"
        },
        "isOptional" : true
      },
      "NoAllocs" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "78"
        },
        "isOptional" : true
      },
      "NoTradingSessions" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "386"
        },
        "isOptional" : true
      },
      "OpenClose" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "77"
        },
        "isOptional" : true
      },
      "OptAttribute" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "206"
        },
        "isOptional" : true
      },
      "OrdType" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "40"
        },
        "isOptional" : true
      },
      "OrderQty" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "38"
        },
        "isOptional" : true
      },
      "OrderQty2" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "192"
        },
        "isOptional" : true
      },
      "PegDifference" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "211"
        },
        "isOptional" : true
      },
      "PrevClosePx" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "140"
        },
        "isOptional" : true
      },
      "Price" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "44"
        },
        "isOptional" : true
      },
      "ProcessCode" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "81"
        },
        "isOptional" : true
      },
      "PutOrCall" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "201"
        },
        "isOptional" : true
      },
      "QuoteID" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "117"
        },
        "isOptional" : true
      },
      "Rule80A" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "47"
        },
        "isOptional" : true
      },
      "SecurityDesc" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "107"
        },
        "isOptional" : true
      },
      "SecurityExchange" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "207"
        },
        "isOptional" : true
      },
      "SecurityID" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "48"
        },
        "isOptional" : true
      },
      "SecurityType" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "167"
        },
        "isOptional" : true
      },
      "SettlCurrency" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "120"
        },
        "isOptional" : true
      },
      "SettlmntTyp" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "63"
        },
        "isOptional" : true
      },
      "Side" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "54"
        },
        "isOptional" : true
      },
      "Signature" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "89"
        },
        "isOptional" : true
      },
      "SignatureLength" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "93"
        },
        "isOptional" : true
      },
      "SolicitedFlag" : {
        "type" : "BOOLEAN",
        "parameters" : {
          "fix.field" : "377"
        },
        "isOptional" : true
      },
      "StopPx" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "99"
        },
        "isOptional" : true
      },
      "StrikePrice" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "202"
        },
        "isOptional" : true
      },
      "Symbol" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "55"
        },
        "isOptional" : true
      },
      "SymbolSfx" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "65"
        },
        "isOptional" : true
      },
      "Text" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "58"
        },
        "isOptional" : true
      },
      "TimeInForce" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "59"
        },
        "isOptional" : true
      },
      "TransactTime" : {
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "type" : "INT64",
        "version" : 1,
        "parameters" : {
          "fix.field" : "60"
        },
        "isOptional" : true
      }
    }
  },
  "value" : {
    "schema" : {
      "name" : "fix42.NewOrderSingle",
      "type" : "STRUCT",
      "isOptional" : false,
      "fieldSchemas" : {
        "Account" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "1"
          },
          "isOptional" : true
        },
        "CashOrderQty" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "152"
          },
          "isOptional" : true
        },
        "CheckSum" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "10"
          },
          "isOptional" : true
        },
        "ClOrdID" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "11"
          },
          "isOptional" : true
        },
        "ClearingAccount" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "440"
          },
          "isOptional" : true
        },
        "ClearingFirm" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "439"
          },
          "isOptional" : true
        },
        "ClientID" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "109"
          },
          "isOptional" : true
        },
        "CommType" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "13"
          },
          "isOptional" : true
        },
        "Commission" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "12"
          },
          "isOptional" : true
        },
        "ComplianceID" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "376"
          },
          "isOptional" : true
        },
        "ContractMultiplier" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "231"
          },
          "isOptional" : true
        },
        "CouponRate" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "223"
          },
          "isOptional" : true
        },
        "CoveredOrUncovered" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "203"
          },
          "isOptional" : true
        },
        "Currency" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "15"
          },
          "isOptional" : true
        },
        "CustomerOrFirm" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "204"
          },
          "isOptional" : true
        },
        "DiscretionInst" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "388"
          },
          "isOptional" : true
        },
        "DiscretionOffset" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "389"
          },
          "isOptional" : true
        },
        "EffectiveTime" : {
          "name" : "org.apache.kafka.connect.data.Timestamp",
          "type" : "INT64",
          "version" : 1,
          "parameters" : {
            "fix.field" : "168"
          },
          "isOptional" : true
        },
        "EncodedIssuer" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "349"
          },
          "isOptional" : true
        },
        "EncodedIssuerLen" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "348"
          },
          "isOptional" : true
        },
        "EncodedSecurityDesc" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "351"
          },
          "isOptional" : true
        },
        "EncodedSecurityDescLen" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "350"
          },
          "isOptional" : true
        },
        "EncodedText" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "355"
          },
          "isOptional" : true
        },
        "EncodedTextLen" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "354"
          },
          "isOptional" : true
        },
        "ExDestination" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "100"
          },
          "isOptional" : true
        },
        "ExecBroker" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "76"
          },
          "isOptional" : true
        },
        "ExecInst" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "18"
          },
          "isOptional" : true
        },
        "ExpireDate" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "432"
          },
          "isOptional" : true
        },
        "ExpireTime" : {
          "name" : "org.apache.kafka.connect.data.Timestamp",
          "type" : "INT64",
          "version" : 1,
          "parameters" : {
            "fix.field" : "126"
          },
          "isOptional" : true
        },
        "ForexReq" : {
          "type" : "BOOLEAN",
          "parameters" : {
            "fix.field" : "121"
          },
          "isOptional" : true
        },
        "FutSettDate" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "64"
          },
          "isOptional" : true
        },
        "FutSettDate2" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "193"
          },
          "isOptional" : true
        },
        "GTBookingInst" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "427"
          },
          "isOptional" : true
        },
        "HandlInst" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "21"
          },
          "isOptional" : true
        },
        "IDSource" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "22"
          },
          "isOptional" : true
        },
        "IOIID" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "23"
          },
          "isOptional" : true
        },
        "Issuer" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "106"
          },
          "isOptional" : true
        },
        "LocateReqd" : {
          "type" : "BOOLEAN",
          "parameters" : {
            "fix.field" : "114"
          },
          "isOptional" : true
        },
        "MaturityDay" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "205"
          },
          "isOptional" : true
        },
        "MaturityMonthYear" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "200"
          },
          "isOptional" : true
        },
        "MaxFloor" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "111"
          },
          "isOptional" : true
        },
        "MaxShow" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "210"
          },
          "isOptional" : true
        },
        "MinQty" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "110"
          },
          "isOptional" : true
        },
        "NoAllocs" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "78"
          },
          "isOptional" : true
        },
        "NoTradingSessions" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "386"
          },
          "isOptional" : true
        },
        "OpenClose" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "77"
          },
          "isOptional" : true
        },
        "OptAttribute" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "206"
          },
          "isOptional" : true
        },
        "OrdType" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "40"
          },
          "isOptional" : true
        },
        "OrderQty" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "38"
          },
          "isOptional" : true
        },
        "OrderQty2" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "192"
          },
          "isOptional" : true
        },
        "PegDifference" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "211"
          },
          "isOptional" : true
        },
        "PrevClosePx" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "140"
          },
          "isOptional" : true
        },
        "Price" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "44"
          },
          "isOptional" : true
        },
        "ProcessCode" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "81"
          },
          "isOptional" : true
        },
        "PutOrCall" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "201"
          },
          "isOptional" : true
        },
        "QuoteID" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "117"
          },
          "isOptional" : true
        },
        "Rule80A" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "47"
          },
          "isOptional" : true
        },
        "SecurityDesc" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "107"
          },
          "isOptional" : true
        },
        "SecurityExchange" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "207"
          },
          "isOptional" : true
        },
        "SecurityID" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "48"
          },
          "isOptional" : true
        },
        "SecurityType" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "167"
          },
          "isOptional" : true
        },
        "SettlCurrency" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "120"
          },
          "isOptional" : true
        },
        "SettlmntTyp" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "63"
          },
          "isOptional" : true
        },
        "Side" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "54"
          },
          "isOptional" : true
        },
        "Signature" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "89"
          },
          "isOptional" : true
        },
        "SignatureLength" : {
          "type" : "INT32",
          "parameters" : {
            "fix.field" : "93"
          },
          "isOptional" : true
        },
        "SolicitedFlag" : {
          "type" : "BOOLEAN",
          "parameters" : {
            "fix.field" : "377"
          },
          "isOptional" : true
        },
        "StopPx" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "99"
          },
          "isOptional" : true
        },
        "StrikePrice" : {
          "type" : "FLOAT64",
          "parameters" : {
            "fix.field" : "202"
          },
          "isOptional" : true
        },
        "Symbol" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "55"
          },
          "isOptional" : true
        },
        "SymbolSfx" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "65"
          },
          "isOptional" : true
        },
        "Text" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "58"
          },
          "isOptional" : true
        },
        "TimeInForce" : {
          "type" : "STRING",
          "parameters" : {
            "fix.field" : "59"
          },
          "isOptional" : true
        },
        "TransactTime" : {
          "name" : "org.apache.kafka.connect.data.Timestamp",
          "type" : "INT64",
          "version" : 1,
          "parameters" : {
            "fix.field" : "60"
          },
          "isOptional" : true
        }
      }
    },
    "fieldValues" : [ {
      "name" : "Account",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "1"
        },
        "isOptional" : true
      }
    }, {
      "name" : "CashOrderQty",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "152"
        },
        "isOptional" : true
      }
    }, {
      "name" : "CheckSum",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "10"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ClOrdID",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "11"
        },
        "isOptional" : true
      },
      "storage" : "NF 0542/03232009"
    }, {
      "name" : "ClearingAccount",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "440"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ClearingFirm",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "439"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ClientID",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "109"
        },
        "isOptional" : true
      }
    }, {
      "name" : "CommType",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "13"
        },
        "isOptional" : true
      }
    }, {
      "name" : "Commission",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "12"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ComplianceID",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "376"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ContractMultiplier",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "231"
        },
        "isOptional" : true
      }
    }, {
      "name" : "CouponRate",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "223"
        },
        "isOptional" : true
      }
    }, {
      "name" : "CoveredOrUncovered",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "203"
        },
        "isOptional" : true
      }
    }, {
      "name" : "Currency",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "15"
        },
        "isOptional" : true
      }
    }, {
      "name" : "CustomerOrFirm",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "204"
        },
        "isOptional" : true
      }
    }, {
      "name" : "DiscretionInst",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "388"
        },
        "isOptional" : true
      }
    }, {
      "name" : "DiscretionOffset",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "389"
        },
        "isOptional" : true
      }
    }, {
      "name" : "EffectiveTime",
      "schema" : {
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "type" : "INT64",
        "version" : 1,
        "parameters" : {
          "fix.field" : "168"
        },
        "isOptional" : true
      }
    }, {
      "name" : "EncodedIssuer",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "349"
        },
        "isOptional" : true
      }
    }, {
      "name" : "EncodedIssuerLen",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "348"
        },
        "isOptional" : true
      }
    }, {
      "name" : "EncodedSecurityDesc",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "351"
        },
        "isOptional" : true
      }
    }, {
      "name" : "EncodedSecurityDescLen",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "350"
        },
        "isOptional" : true
      }
    }, {
      "name" : "EncodedText",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "355"
        },
        "isOptional" : true
      }
    }, {
      "name" : "EncodedTextLen",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "354"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ExDestination",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "100"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ExecBroker",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "76"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ExecInst",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "18"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ExpireDate",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "432"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ExpireTime",
      "schema" : {
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "type" : "INT64",
        "version" : 1,
        "parameters" : {
          "fix.field" : "126"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ForexReq",
      "schema" : {
        "type" : "BOOLEAN",
        "parameters" : {
          "fix.field" : "121"
        },
        "isOptional" : true
      }
    }, {
      "name" : "FutSettDate",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "64"
        },
        "isOptional" : true
      }
    }, {
      "name" : "FutSettDate2",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "193"
        },
        "isOptional" : true
      }
    }, {
      "name" : "GTBookingInst",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "427"
        },
        "isOptional" : true
      }
    }, {
      "name" : "HandlInst",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "21"
        },
        "isOptional" : true
      },
      "storage" : "1"
    }, {
      "name" : "IDSource",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "22"
        },
        "isOptional" : true
      }
    }, {
      "name" : "IOIID",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "23"
        },
        "isOptional" : true
      }
    }, {
      "name" : "Issuer",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "106"
        },
        "isOptional" : true
      }
    }, {
      "name" : "LocateReqd",
      "schema" : {
        "type" : "BOOLEAN",
        "parameters" : {
          "fix.field" : "114"
        },
        "isOptional" : true
      }
    }, {
      "name" : "MaturityDay",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "205"
        },
        "isOptional" : true
      }
    }, {
      "name" : "MaturityMonthYear",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "200"
        },
        "isOptional" : true
      }
    }, {
      "name" : "MaxFloor",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "111"
        },
        "isOptional" : true
      }
    }, {
      "name" : "MaxShow",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "210"
        },
        "isOptional" : true
      }
    }, {
      "name" : "MinQty",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "110"
        },
        "isOptional" : true
      }
    }, {
      "name" : "NoAllocs",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "78"
        },
        "isOptional" : true
      }
    }, {
      "name" : "NoTradingSessions",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "386"
        },
        "isOptional" : true
      }
    }, {
      "name" : "OpenClose",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "77"
        },
        "isOptional" : true
      }
    }, {
      "name" : "OptAttribute",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "206"
        },
        "isOptional" : true
      }
    }, {
      "name" : "OrdType",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "40"
        },
        "isOptional" : true
      },
      "storage" : "1"
    }, {
      "name" : "OrderQty",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "38"
        },
        "isOptional" : true
      },
      "storage" : 100.0
    }, {
      "name" : "OrderQty2",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "192"
        },
        "isOptional" : true
      }
    }, {
      "name" : "PegDifference",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "211"
        },
        "isOptional" : true
      }
    }, {
      "name" : "PrevClosePx",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "140"
        },
        "isOptional" : true
      }
    }, {
      "name" : "Price",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "44"
        },
        "isOptional" : true
      }
    }, {
      "name" : "ProcessCode",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "81"
        },
        "isOptional" : true
      }
    }, {
      "name" : "PutOrCall",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "201"
        },
        "isOptional" : true
      }
    }, {
      "name" : "QuoteID",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "117"
        },
        "isOptional" : true
      }
    }, {
      "name" : "Rule80A",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "47"
        },
        "isOptional" : true
      },
      "storage" : "A"
    }, {
      "name" : "SecurityDesc",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "107"
        },
        "isOptional" : true
      }
    }, {
      "name" : "SecurityExchange",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "207"
        },
        "isOptional" : true
      },
      "storage" : "N"
    }, {
      "name" : "SecurityID",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "48"
        },
        "isOptional" : true
      }
    }, {
      "name" : "SecurityType",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "167"
        },
        "isOptional" : true
      }
    }, {
      "name" : "SettlCurrency",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "120"
        },
        "isOptional" : true
      }
    }, {
      "name" : "SettlmntTyp",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "63"
        },
        "isOptional" : true
      }
    }, {
      "name" : "Side",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "54"
        },
        "isOptional" : true
      },
      "storage" : "1"
    }, {
      "name" : "Signature",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "89"
        },
        "isOptional" : true
      }
    }, {
      "name" : "SignatureLength",
      "schema" : {
        "type" : "INT32",
        "parameters" : {
          "fix.field" : "93"
        },
        "isOptional" : true
      }
    }, {
      "name" : "SolicitedFlag",
      "schema" : {
        "type" : "BOOLEAN",
        "parameters" : {
          "fix.field" : "377"
        },
        "isOptional" : true
      }
    }, {
      "name" : "StopPx",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "99"
        },
        "isOptional" : true
      }
    }, {
      "name" : "StrikePrice",
      "schema" : {
        "type" : "FLOAT64",
        "parameters" : {
          "fix.field" : "202"
        },
        "isOptional" : true
      }
    }, {
      "name" : "Symbol",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "55"
        },
        "isOptional" : true
      },
      "storage" : "CVS"
    }, {
      "name" : "SymbolSfx",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "65"
        },
        "isOptional" : true
      }
    }, {
      "name" : "Text",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "58"
        },
        "isOptional" : true
      }
    }, {
      "name" : "TimeInForce",
      "schema" : {
        "type" : "STRING",
        "parameters" : {
          "fix.field" : "59"
        },
        "isOptional" : true
      },
      "storage" : "0"
    }, {
      "name" : "TransactTime",
      "schema" : {
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "type" : "INT64",
        "version" : 1,
        "parameters" : {
          "fix.field" : "60"
        },
        "isOptional" : true
      },
      "storage" : 1237822829000
    } ]
  },
  "headers" : [ ]
}

Configuration Properties

General

topic

The Kafka topic to write the data to.

  • Importance: HIGH
  • Type: STRING
batch.size

The number of records that should be returned with each batch.

  • Importance: LOW
  • Type: INT
  • Default Value: 1000
empty.poll.wait.ms

The amount of time to wait if a poll returns an empty list of records.

  • Importance: LOW
  • Type: LONG
  • Default Value: 500
  • Validator: [1,…,9223372036854775807]

Metadata

metadata.field

The name of the field in the value where the metadata will be stored.

  • Importance: LOW
  • Type: STRING
  • Default Value: metadata
metadata.location

Location that metadata about the input file will be stored. FIELD - Metadata about the file will be stored in a field in the value of the record. HEADERS - Metadata about the input file will be stored as headers on the record. NONE - no metadata about the input file will be stored.

  • Importance: LOW
  • Type: STRING
  • Default Value: HEADERS
  • Validator: Matches: NONE, HEADERS, FIELD

File System

error.path

The directory to place files that have errors. This directory must exist and be writable by the user running Kafka Connect.

  • Importance: HIGH
  • Type: STRING
  • Validator: Absolute path to a directory that exists and is writable.
input.file.pattern

Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches().

  • Importance: HIGH
  • Type: STRING
input.path

The directory where Kafka Connect reads files that are processed. This directory must exist and be writable by the user running Connect.

  • Importance: HIGH
  • Type: STRING
  • Validator: Absolute path to a directory that exists and is writable.
finished.path

The directory where Connect puts files that are successfully processed. This directory must exist and be writable by the user running Connect.

  • Importance: HIGH
  • Type: STRING
halt.on.error

Sets whether the task halts when it encounters an error or continues to the next file.

  • Importance: HIGH
  • Type: BOOLEAN
  • Default Value: true
cleanup.policy

Determines how the connector should cleanup the files that have been successfully processed. NONE leaves the files in place which could cause them to be reprocessed if the connector is restarted. DELETE removes the file from the filesystem. MOVE will move the file to a finished directory. MOVEBYDATE will move the file to a finished directory with subdirectories by date.

  • Importance: MEDIUM
  • Type: STRING
  • Default Value: MOVE
  • Validator: Matches: NONE, DELETE, MOVE, MOVEBYDATE
task.partitioner

The task partitioner implementation is used when the connector is configured to use more than one task. This is used by each task to identify which files will be processed by that task. This ensures that each file is only assigned to one task.

  • Importance: MEDIUM
  • Type: STRING
  • Default Value: ByName
  • Validator: Matches: ByName
file.buffer.size.bytes

The size of buffer for the BufferedInputStream that will be used to interact with the file system.

  • Importance: LOW
  • Type: INT
  • Default Value: 131072
  • Validator: [1,…]
file.minimum.age.ms

The amount of time in milliseconds after the file was last written to before the file can be processed.

  • Importance: LOW
  • Type: LONG
  • Default Value: 0
  • Validator: [0,…]
files.sort.attributes

The attributes each file will use to determine the sort order. Name is name of the file. Length is the length of the file preferring larger files first. LastModified is the LastModified attribute of the file preferring older files first.

  • Importance: LOW
  • Type: LIST
  • Default Value: [NameAsc]
  • Validator: Matches: NameAsc, NameDesc, LengthAsc, LengthDesc, LastModifiedAsc, LastModifiedDesc
processing.file.extension

Before a file is processed, it is renamed to indicate that it is currently being processed. This setting is appended to the end of the file.

  • Importance: LOW
  • Type: STRING
  • Default Value: .PROCESSING
  • Validator: Matches regex( ^.*..+$ )

Timestamps

timestamp.mode

Determines how the connector sets the timestamp for the ConnectRecord. If set to Field, the timestamp is read from a field in the value. This field cannot be optional and must be a Timestamp. Specify the field in timestamp.field. If set to FILE_TIME, the last time the file was modified is used. If set to PROCESS_TIME (the default), the time the record is read is used.

  • Importance: MEDIUM
  • Type: STRING
  • Default Value: PROCESS_TIME
  • Validator: Matches: FIELD, FILE_TIME, PROCESS_TIME