CSV Source Connector for Confluent Platform¶
このコネクターでは、ファイルの input.path
で指定されたディレクトリをモニタリングし、ファイルを CSV として読み取って、各レコードを key.schema
と value.schema
で厳密に型指定された同等なレコードに変換します。
このコネクターを使用するには、connector.class
構成プロパティでこのコネクタークラスの名前を指定します。
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
コネクター固有の構成プロパティについて、以降で説明します。
CSV Source Connector の例¶
以降の例では、Confluent Platform と Spool Dir コネクターのインストールについてクイックスタートと同じ手順に従います。
スキーマを含む CSV の例¶
この例では、CSV ファイルを読み取って Kafka に書き込みます。key.schema
と value.schema
で指定されているスキーマを使用して、それらのファイルを解析します。
データディレクトリを作成し、テストデータを生成します。
curl "https://api.mockaroo.com/api/58605010?count=1000&key=25fd9c80" > "data/csv-spooldir-source.csv"
spooldir.properties
ファイルを以下の内容で作成します。name=CsvSchemaSpoolDir tasks.max=1 connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector input.path=/path/to/data input.file.pattern=csv-spooldir-source.csv error.path=/path/to/error finished.path=/path/to/finished halt.on.error=false topic=spooldir-testing-topic csv.first.row.as.header=true key.schema={\n \"name\" : \"com.example.users.UserKey\",\n \"type\" : \"STRUCT\",\n \"isOptional\" : false,\n \"fieldSchemas\" : {\n \"id\" : {\n \"type\" : \"INT64\",\n \"isOptional\" : false\n }\n }\n} value.schema={\n \"name\" : \"com.example.users.User\",\n \"type\" : \"STRUCT\",\n \"isOptional\" : false,\n \"fieldSchemas\" : {\n \"id\" : {\n \"type\" : \"INT64\",\n \"isOptional\" : false\n },\n \"first_name\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"last_name\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"email\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"gender\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"ip_address\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"last_login\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"account_balance\" : {\n \"name\" : \"org.apache.kafka.connect.data.Decimal\",\n \"type\" : \"BYTES\",\n \"version\" : 1,\n \"parameters\" : {\n \"scale\" : \"2\"\n },\n \"isOptional\" : true\n },\n \"country\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"favorite_color\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n }\n }\n}
SpoolDir CSV Source Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を入れる必要があります。詳細については、こちらの投稿 を参照してください。confluent local services connect connector load spooldir --config spooldir.properties
重要
Confluent CLI は本稼働環境では使用しないでください。
Avro を使用してシリアル化された Kafka にメッセージが送信されたことを確認します。
kafka-avro-console-consumer --topic spooldir-testing-topic --from-beginning --bootstrap-server localhost:9092
TSV 入力ファイルの例¶
以下の例では、TSV ファイルを読み込み、各レコードを Kafka に生成します。
以下のコマンドを使用して、TSV データセットを生成します。
curl "https://api.mockaroo.com/api/b10f7e90?count=1000&key=25fd9c80" > "tsv-spooldir-source.tsv"
spooldir.properties
ファイルを以下の内容で作成します。name=TsvSpoolDir tasks.max=1 connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector input.path=/path/to/data input.file.pattern=tsv-spooldir-source.tsv error.path=/path/to/error finished.path=/path/to/finished halt.on.error=false topic=spooldir-tsv-topic schema.generation.enabled=true csv.first.row.as.header=true csv.separator.char=9
SpoolDir JSON Source Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を入れる必要があります。詳細については、こちらの投稿 を参照してください。confluent local services connect connector load spooldir --config spooldir.properties
重要
Confluent CLI は本稼働環境では使用しないでください。
構成プロパティ¶
全般¶
topic
データの書き込み先 Kafka トピック。
- 重要度: 高
- 型: STRING
batch.size
各バッチで返されるレコード数。
- 重要度: 低
- 型: INT
- デフォルト値: 1000
empty.poll.wait.ms
ポーリングで空のレコードリストが返された場合の待ち時間。
- 重要度: 低
- 型: LONG
- デフォルト値: 500
- バリデーター: [1,…,9223372036854775807]
メタデータ¶
metadata.field
値においてメタデータが格納されるフィールドの名前。
- 重要度: 低
- 型: STRING
- デフォルト値: metadata
metadata.location
入力ファイルに関するメタデータの格納先。FIELD - 入力ファイルに関するメタデータは、レコードの値に含まれるフィールドに格納されます。HEADERS - 入力ファイルに関するメタデータは、レコードのヘッダーとして格納されます。NONE - 入力ファイルに関するメタデータは格納されません。
- 重要度: 低
- 型: STRING
- デフォルト値: HEADERS
- 指定可能な値:
NONE
、HEADERS
、FIELD
トピックの自動作成¶
トピックの自動作成の詳細については、「ソースコネクターのトピックの自動作成の構成 」を参照してください。
注釈
構成プロパティには、Java regex として定義されている正規表現(regex)を使用できます。
topic.creation.groups
一致するトピックにグループ別のトピック構成を定義するために使用するグループ別名のリスト。
default
グループは常に存在し、すべてのトピックに一致します。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: このプロパティの値には、任意の追加グループを指定できます。トピックの構成には常に
default
グループが定義されています。
topic.creation.$alias.replication.factor
コネクターで作成する新規トピックのレプリケーション係数。この値は、Kafka クラスターのブローカーの数を超えてはなりません。この値が Kafka ブローカーの数よりも大きい場合、コネクターがトピックの作成を試行するとエラーが発生します。
default
グループの場合、これは 必須のプロパティ です。topic.creation.groups
で定義されている他のグループの場合、このプロパティは省略可能です。他のグループでは、Kafka ブローカーのデフォルト値が使用されます。- 型: int
- デフォルト: なし
- 指定可能な値 : 具体的な有効値を指定する場合は
>= 1
で指定し、Kafka ブローカーのデフォルト値を使用する場合は-1
を指定します。
topic.creation.$alias.partitions
このコネクターによって作成されるトピックのパーティションの数。
default
グループの場合、これは 必須のプロパティ です。topic.creation.groups
で定義されている他のグループの場合、このプロパティは省略可能です。他のグループでは、Kafka ブローカーのデフォルト値が使用されます。- 型: int
- デフォルト: なし
- 指定可能な値 : 具体的な有効値を指定する場合は
>= 1
で指定し、Kafka ブローカーのデフォルト値を使用する場合は-1
を指定します。
topic.creation.$alias.include
トピック名に一致する正規表現を表す文字列のリスト。このリストは、一致する値を持つトピックを対象に含め、このグループの特定の構成を一致するトピックに適用するために使用します。
topic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。default
グループには、このプロパティは適用されません。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: 正確なトピック名または正規表現のコンマ区切りのリスト。
topic.creation.$alias.exclude
トピック名に一致する正規表現を表す文字列のリスト。このリストは、一致する値を持つトピックを、グループの特定の構成の適用から除外するために使用します。
topic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。default
グループには、このプロパティは適用されません。トピックの除外ルールは、すべての包含ルールをオーバーライドすることに注意してください。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: 正確なトピック名または正規表現のコンマ区切りのリスト。
topic.creation.$alias.${kafkaTopicSpecificConfigName}
レコードの書き込み先の Kafka ブローカーのバージョンに対するすべての ブローカー構成の動的な変更 。ルールに対して構成が指定されていない場合、ブローカーのトピックレベルの構成値が使用されます。
default
グループ、およびtopic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。- 型: プロパティ値
- デフォルト : Kafka ブローカー値
ファイルシステム¶
error.path
エラーを含むファイルを配置するディレクトリ。このディレクトリが存在し、Kafka Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 重要度: 高
- 型: STRING
- バリデーター: 存在し、書き込み可能であるディレクトリへの絶対パス。
input.file.pattern
入力ファイル名と照合する正規表現。ファイル名全体と一致する正規表現を使用してください。
Matcher.matches()
と同等。有効な構文の定義については、「Class Pattern」を参照してください。- 重要度: 高
- 型: STRING
input.path
Kafka Connect が処理対象のファイルを読み取るディレクトリ。このディレクトリが存在し、Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 重要度: 高
- 型: STRING
- バリデーター: 存在し、書き込み可能であるディレクトリへの絶対パス。
finished.path
Connect が正常に処理されたファイルを配置するディレクトリ。このディレクトリが存在し、Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 重要度: 高
- 型: STRING
halt.on.error
エラー発生時に、タスクを中止するか次のファイルに進むかを設定します。
- 重要度: 高
- 型: BOOLEAN
- デフォルト値: true
cleanup.policy
正常に処理されたファイルをコネクターでクリーンアップする方法を決定します。NONE を指定するとファイルがそのまま残されますが、コネクターが再起動された場合に再処理される原因になる可能性があります。DELETE を指定すると、ファイルシステムからファイルが削除されます。MOVE を指定すると、完了したファイル用のディレクトリにファイルが移動されます。MOVEBYDATE を指定すると、完了したファイル用のディレクトリに日付ごとのサブディレクトリが作成されてファイルが移動されます。
- 重要度: 中
- 型: STRING
- デフォルト値: MOVE
- 指定可能な値:
NONE
、DELETE
、MOVE
、MOVEBYDATE
task.partitioner
複数のタスクを使用するようコネクターを構成した場合は、タスクパーティショナーの実装が使用されます。タスクで処理されるファイルを特定するために各タスクで使用されます。これにより、各ファイルの割り当て先タスクが 1 つに限定されます。
- 重要度: 中
- 型: STRING
- デフォルト値: ByName
- バリデーター : 一致文字列:
ByName
file.buffer.size.bytes
BufferedInputStream のバッファサイズ。ファイルシステムとのやり取りで使用されます。
- 重要度: 低
- 型: INT
- デフォルト値: 131072
- バリデーター: [1,…]
file.minimum.age.ms
ファイルへの最後の書き込みが終わってからファイルを処理できるようになるまでの待ち時間(ミリ秒)。
- 重要度: 低
- 型: LONG
- デフォルト値: 0
- バリデーター: [0,…]
files.sort.attributes
各ファイルによってソート順序の決定に使用される属性。Name はファイルの名前です。Length はファイルの長さで、長いファイルほど先になります。LastModified はファイルの LastModified 属性で、古いファイルほど先になります。
- 重要度: 低
- 型: LIST
- デフォルト値: [NameAsc]
- バリデーター : 一致文字列:
NameAsc
、NameDesc
、LengthAsc
、LengthDesc
、LastModifiedAsc
、LastModifiedDesc
processing.file.extension
ファイルは、処理される前に、現在処理されていることを示す名前に変更されます。この設定は、ファイルの末尾に追加されます。
- 重要度: 低
- 型: STRING
- デフォルト値: .PROCESSING
- バリデーター : 一致文字列: regex( ^.*..+$ )
スキーマ¶
key.schema
Kafka に書き込まれるキーのスキーマ。
- 重要度: 高
- 型: STRING
value.schema
Kafka に書き込まれる値のスキーマ。
- 重要度: 高
- 型: STRING
スキーマ生成¶
schema.generation.enabled
スキーマが動的に生成されるかどうかを決定するフラグ。true を設定した場合は、
key.schema
とvalue.schema
を省略できますが、schema.generation.key.name
とschema.generation.value.name
は設定する必要があります。- 重要度: 中
- 型: BOOLEAN
schema.generation.key.fields
キースキーマのビルドに使用されるフィールド。これは、スキーマ生成時にのみ使用されます。
- 重要度: 中
- 型: LIST
schema.generation.key.name
生成されるキースキーマの名前。
- 重要度: 中
- 型: STRING
- デフォルト値: com.github.jcustenborder.kafka.connect.model.Key
schema.generation.value.name
生成される値スキーマの名前。
- 重要度: 中
- 型: STRING
- デフォルト値: com.github.jcustenborder.kafka.connect.model.Value
timestamp.field
値スキーマ内で、解析対象となるレコードのタイムスタンプを含むフィールド。このフィールドは、省略可能とすることはできません。また、タイムスタンプ であることが必要です。
- 重要度: 中
- 型: STRING
タイムスタンプ¶
timestamp.mode
コネクターで ConnectRecord にタイムスタンプがどのように設定されるかを決定します。
Field
を設定した場合、タイムスタンプは値のフィールドから読み取られます。このフィールドは、省略可能とすることはできません。また、タイムスタンプ であることが必要です。フィールドは、timestamp.field
で指定します。FILE_TIME
を設定した場合、ファイルの最終更新時刻が使用されます。PROCESS_TIME
(デフォルト)を設定した場合、レコードが読み取られた時刻が使用されます。- 重要度: 中
- 型: STRING
- デフォルト値: PROCESS_TIME
- 指定可能な値:
FIELD
、FILE_TIME
、PROCESS_TIME
timestamp.field
値スキーマ内で、解析対象となるレコードのタイムスタンプを含むフィールド。このフィールドは、省略可能とすることはできません。また、タイムスタンプ であることが必要です。
- 重要度: 中
- 型: STRING
parser.timestamp.date.formats
ファイルで想定される日付フォーマット。これは、日付フィールドを順番に解析するために使用される文字列のリストです。最も正確な日付フォーマットをリストの先頭にする必要があります。詳細については、Java のドキュメント を参照してください。
- 重要度: 低
- 型: LIST
- デフォルト値 : [yyyy-MM-dd'T'HH:mm:ss、yyyy-MM-dd' 'HH:mm:ss]
parser.timestamp.timezone
すべての解析済み日付に使用するタイムゾーン。
- 重要度: 低
- 型: STRING
- デフォルト値: UTC
CSV 解析¶
csv.case.sensitive.field.names
ヘッダー行のフィールド名で大文字と小文字を区別するかどうかを決定するフラグ。
- 重要度: 低
- 型: BOOLEAN
- デフォルト値: false
csv.rfc.4180.parser.enabled
RFC 4180 パーサーをデフォルトパーサーの代わりに使用するかどうかを決定するフラグ。
- 重要度: 低
- 型: BOOLEAN
- デフォルト値: false
csv.first.row.as.header
データの先頭行にファイルのヘッダーが含まれているかどうかを指定するフラグ。true の場合、列の配置は、CSV ファイルの先頭行によって決まります。列位置は、
value.schema
で指定されたスキーマの位置から推測されます。true に設定した場合、列数が、スキーマのフィールド数以上であることが必要です。- 重要度: 中
- 型: BOOLEAN
- デフォルト値: false
csv.escape.char
特殊文字を示す、整数フォーム(ASCII コード)の文字。通常、CSV ファイルでは
\(92)
が使用されます。- 重要度: 低
- 型: INT
- デフォルト値: 92
csv.file.charset
ファイルを読み取るために使用される文字セット。
- 重要度: 低
- 型: STRING
- デフォルト値: UTF-8
- 指定可能な値: Big5、Big5-HKSCS、CESU-8、EUC-JP、EUC-KR、GB18030、GB2312、GBK、IBM-Thai、IBM00858、IBM01140、IBM01141、IBM01142、IBM01143、IBM01144、IBM01145、IBM01146、IBM01147、IBM01148、IBM01149、IBM037、IBM1026、IBM1047、IBM273、IBM277、IBM278、IBM280、IBM284、IBM285、IBM290、IBM297、IBM420、IBM424、IBM437、IBM500、IBM775、IBM850、IBM852、IBM855、IBM857、IBM860、IBM861、IBM862、IBM863、IBM864、IBM865、IBM866、IBM868、IBM869、IBM870、IBM871、IBM918、ISO-2022-CN、ISO-2022-JP、ISO-2022-JP-2、ISO-2022-KR、ISO-8859-1、ISO-8859-13、ISO-8859-15、ISO-8859-2、ISO-8859-3、ISO-8859-4、ISO-8859-5、ISO-8859-6、ISO-8859-7、ISO-8859-8、ISO-8859-9、JIS_X0201、JIS_X0212-1990、KOI8-R、KOI8-U、Shift_JIS、TIS-620、US-ASCII、UTF-16、UTF-16BE、UTF-16LE、UTF-32、UTF-32BE、UTF-32LE、UTF-8、windows-1250、windows-1251、windows-1252、windows-1253、windows-1254、windows-1255、windows-1256、windows-1257、windows-1258、windows-31j、x-Big5-HKSCS-2001、x-Big5-Solaris、x-COMPOUND_TEXT、x-euc-jp-linux、x-EUC-TW、x-eucJP-Open、x-IBM1006、x-IBM1025、x-IBM1046、x-IBM1097、x-IBM1098、x-IBM1112、x-IBM1122、x-IBM1123、x-IBM1124、x-IBM1166、x-IBM1364、x-IBM1381、x-IBM1383、x-IBM300、x-IBM33722、x-IBM737、x-IBM833、x-IBM834、x-IBM856、x-IBM874、x-IBM875、x-IBM921、x-IBM922、x-IBM930、x-IBM933、x-IBM935、x-IBM937、x-IBM939、x-IBM942、x-IBM942C、x-IBM943、x-IBM943C、x-IBM948、x-IBM949、x-IBM949C、x-IBM950、x-IBM964、x-IBM970、x-ISCII91、x-ISO-2022-CN-CNS、x-ISO-2022-CN-GB、x-iso-8859-11、x-JIS0208、x-JISAutoDetect、x-Johab、x-MacArabic、x-MacCentralEurope、x-MacCroatian、x-MacCyrillic、x-MacDingbat、x-MacGreek、x-MacHebrew、x-MacIceland、x-MacRoman、x-MacRomania、x-MacSymbol、x-MacThai、x-MacTurkish、x-MacUkraine、x-MS932_0213、x-MS950-HKSCS、x-MS950-HKSCS-XP、x-mswin-936、x-PCK、x-SJIS_0213、x-UTF-16LE-BOM、X-UTF-32BE-BOM、X-UTF-32LE-BOM、x-windows-50220、x-windows-50221、x-windows-874、x-windows-949、x-windows-950、x-windows-iso2022jp
csv.ignore.leading.whitespace
先頭の空白文字を無視するかどうかを設定します。true(デフォルト)に設定した場合、フィールド内のクォートの前にある空白文字は無視されます。
- 重要度: 低
- 型: BOOLEAN
- デフォルト値: true
csv.ignore.quotations
引用符を無視するかどうかを設定します。true に設定した場合、引用符は無視されます。
- 重要度: 低
- 型: BOOLEAN
- デフォルト値: false
csv.keep.carriage.return
行末のキャリッジリターンを維持するかどうかを決定するフラグ。
- 重要度: 低
- 型: BOOLEAN
- デフォルト値: false
csv.null.field.indicator
CSV リーダーでフィールドが null かどうかを判断する方法を決定するインジケーター。指定可能な値は、
EMPTY_SEPARATORS
、EMPTY_QUOTES
、BOTH
、またはNEITHER
(デフォルト)です。詳細については、Opencsv のドキュメント を参照してください。- 重要度: 低
- 型: STRING
- デフォルト値: NEITHER
- 指定可能な値:
EMPTY_SEPARATORS
、EMPTY_QUOTES
、BOTH
、NEITHER
csv.quote.char
フィールドをクォートするために使用される文字。通常、
csv.separator.char
に指定する文字がデータ内にある場合に使用されます。- 重要度: 低
- 型: INT
- デフォルト値: 34
csv.separator.char
各フィールドを区切る文字を整数で指定します。通常、CSV ファイルでは
,(44)
、TSV ファイルではtab(9)
が使用されます。csv.separator.char
がnull(0)
として定義されている場合、デフォルトで RFC 4180 パーサーを使用する必要があります。これは、csv.rfc.4180.parser.enabled = true
の設定と同等です。- 重要度: 低
- 型: INT
- デフォルト値: 44
csv.skip.lines
ファイルの先頭でスキップする行数。
- 重要度: 低
- 型: INT
- デフォルト値: 0
csv.strict.quotes
厳密なクォートの設定を指定します。true の場合、クォートの外側の文字は無視されます。
- 重要度: 低
- 型: BOOLEAN
- デフォルト値: false
csv.verify.reader
リーダーを検証する必要があるかどうかを決定するフラグ。
- 重要度: 低
- 型: BOOLEAN
- デフォルト値: true