Schemaless JSON Source Connector for Confluent Platform¶
このコネクターは、ディレクトリから JSON ファイルをストリーミング するために使用されます。JSON レコードをスキーマに変換しようとはしません。使用する推奨コンバーターは StringConverter です。
value.converter=org.apache.kafka.connect.storage.StringConverter
このコネクターを使用するには、connector.class
構成プロパティでこのコネクタークラスの名前を指定するコネクター構成を使用します。
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector
コネクター固有の他の構成プロパティについて、以降で説明します。
JSON Schemaless Source Connector の例¶
この例では、クイックスタートと同じ手順に従います。クイックスタートを参照しながら、Confluent Platform を実行し、Spool Dir コネクターをインストールします。
- 前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
以下のコマンドを使用して、JSON データセットを生成します。
curl "https://api.mockaroo.com/api/17c84440?count=500&key=25fd9c80" > "json-spooldir-source.json"
spooldir.properties
ファイルを以下の内容で作成します。name=SchemaLessJsonSpoolDir tasks.max=1 connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector input.path=/path/to/data input.file.pattern=json-spooldir-source.json error.path=/path/to/error finished.path=/path/to/finished halt.on.error=false topic=spooldir-schemaless-json-topic value.converter=org.apache.kafka.connect.storage.StringConverter
Confluent CLI で confluent local services connect connector load コマンドを使用して SpoolDir Schemaless JSON Source Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を入れる必要があります。詳細については、こちらの投稿 を参照してください。confluent local services connect connector load spooldir --config spooldir.properties
重要
Confluent CLI コマンドは、本稼働環境では使用しないでください。
構成プロパティ¶
全般¶
topic
データの書き込み先 Kafka トピック。
- 重要度: 高
- 型: STRING
batch.size
各バッチで返されるレコード数。
- 重要度: 低
- 型: INT
- デフォルト値: 1000
empty.poll.wait.ms
ポーリングで空のレコードリストが返された場合の待ち時間。
- 重要度: 低
- 型: LONG
- デフォルト値: 500
- バリデーター: [1,…,9223372036854775807]
メタデータ¶
metadata.field
値においてメタデータが格納されるフィールドの名前。
- 重要度: 低
- 型: STRING
- デフォルト値: metadata
metadata.location
入力ファイルに関するメタデータの格納先。FIELD - 入力ファイルに関するメタデータは、レコードの値に含まれるフィールドに格納されます。HEADERS - 入力ファイルに関するメタデータは、レコードのヘッダーとして格納されます。NONE - 入力ファイルに関するメタデータは格納されません。
- 重要度: 低
- 型: STRING
- デフォルト値: HEADERS
- 指定可能な値:
NONE
、HEADERS
、FIELD
トピックの自動作成¶
トピックの自動作成の詳細については、「ソースコネクターのトピックの自動作成の構成 」を参照してください。
注釈
構成プロパティには、Java regex として定義されている正規表現(regex)を使用できます。
topic.creation.groups
一致するトピックにグループ別のトピック構成を定義するために使用するグループ別名のリスト。
default
グループは常に存在し、すべてのトピックに一致します。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: このプロパティの値には、任意の追加グループを指定できます。トピックの構成には常に
default
グループが定義されています。
topic.creation.$alias.replication.factor
コネクターで作成する新規トピックのレプリケーション係数。この値は、Kafka クラスターのブローカーの数を超えてはなりません。この値が Kafka ブローカーの数よりも大きい場合、コネクターがトピックの作成を試行するとエラーが発生します。
default
グループの場合、これは 必須のプロパティ です。topic.creation.groups
で定義されている他のグループの場合、このプロパティは省略可能です。他のグループでは、Kafka ブローカーのデフォルト値が使用されます。- 型: int
- デフォルト: なし
- 指定可能な値 : 具体的な有効値を指定する場合は
>= 1
で指定し、Kafka ブローカーのデフォルト値を使用する場合は-1
を指定します。
topic.creation.$alias.partitions
このコネクターによって作成されるトピックのパーティションの数。
default
グループの場合、これは 必須のプロパティ です。topic.creation.groups
で定義されている他のグループの場合、このプロパティは省略可能です。他のグループでは、Kafka ブローカーのデフォルト値が使用されます。- 型: int
- デフォルト: なし
- 指定可能な値 : 具体的な有効値を指定する場合は
>= 1
で指定し、Kafka ブローカーのデフォルト値を使用する場合は-1
を指定します。
topic.creation.$alias.include
トピック名に一致する正規表現を表す文字列のリスト。このリストは、一致する値を持つトピックを対象に含め、このグループの特定の構成を一致するトピックに適用するために使用します。
topic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。default
グループには、このプロパティは適用されません。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: 正確なトピック名または正規表現のコンマ区切りのリスト。
topic.creation.$alias.exclude
トピック名に一致する正規表現を表す文字列のリスト。このリストは、一致する値を持つトピックを、グループの特定の構成の適用から除外するために使用します。
topic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。default
グループには、このプロパティは適用されません。トピックの除外ルールは、すべての包含ルールをオーバーライドすることに注意してください。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: 正確なトピック名または正規表現のコンマ区切りのリスト。
topic.creation.$alias.${kafkaTopicSpecificConfigName}
レコードの書き込み先の Kafka ブローカーのバージョンに対するすべての ブローカー構成の動的な変更 を行います。ルールに対して構成が指定されていない場合、ブローカーのトピックレベルの構成値が使用されます。
default
グループ、およびtopic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。- 型: プロパティ値
- デフォルト : Kafka ブローカー値
ファイルシステム¶
error.path
エラーを含むファイルを配置するディレクトリ。このディレクトリが存在し、Kafka Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 重要度: 高
- 型: STRING
- バリデーター: 存在し、書き込み可能であるディレクトリへの絶対パス。
input.file.pattern
入力ファイル名と照合する正規表現。ファイル名全体と一致する正規表現を使用してください。
Matcher.matches()
と同等。有効な構文の定義については、「Class Pattern」を参照してください。- 重要度: 高
- 型: STRING
input.path
Kafka Connect が処理対象のファイルを読み取るディレクトリ。このディレクトリが存在し、Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 重要度: 高
- 型: STRING
- バリデーター: 存在し、書き込み可能であるディレクトリへの絶対パス。
finished.path
Connect が正常に処理されたファイルを配置するディレクトリ。このディレクトリが存在し、Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 重要度: 高
- 型: STRING
halt.on.error
エラー発生時に、タスクを中止するか次のファイルに進むかを設定します。
- 重要度: 高
- 型: BOOLEAN
- デフォルト値: true
cleanup.policy
正常に処理されたファイルをコネクターでクリーンアップする方法を決定します。NONE を指定するとファイルがそのまま残されますが、コネクターが再起動された場合に再処理される原因になる可能性があります。DELETE を指定すると、ファイルシステムからファイルが削除されます。MOVE を指定すると、完了したファイル用のディレクトリにファイルが移動されます。MOVEBYDATE を指定すると、完了したファイル用のディレクトリに日付ごとのサブディレクトリが作成されてファイルが移動されます。
- 重要度: 中
- 型: STRING
- デフォルト値: MOVE
- 指定可能な値:
NONE
、DELETE
、MOVE
、MOVEBYDATE
task.partitioner
複数のタスクを使用するようコネクターを構成した場合は、タスクパーティショナーの実装が使用されます。タスクで処理されるファイルを特定するために各タスクで使用されます。これにより、各ファイルの割り当て先タスクが 1 つに限定されます。
- 重要度: 中
- 型: STRING
- デフォルト値: ByName
- バリデーター : 一致文字列:
ByName
file.buffer.size.bytes
BufferedInputStream のバッファサイズ。ファイルシステムとのやり取りで使用されます。
- 重要度: 低
- 型: INT
- デフォルト値: 131072
- バリデーター: [1,…]
file.minimum.age.ms
ファイルへの最後の書き込みが終わってからファイルを処理できるようになるまでの待ち時間(ミリ秒)。
- 重要度: 低
- 型: LONG
- デフォルト値: 0
- バリデーター: [0,…]
files.sort.attributes
各ファイルによってソート順序の決定に使用される属性。Name はファイルの名前です。Length はファイルの長さで、長いファイルほど先になります。LastModified はファイルの LastModified 属性で、古いファイルほど先になります。
- 重要度: 低
- 型: LIST
- デフォルト値: [NameAsc]
- バリデーター : 一致文字列:
NameAsc
、NameDesc
、LengthAsc
、LengthDesc
、LastModifiedAsc
、LastModifiedDesc
processing.file.extension
ファイルは、処理される前に、現在処理されていることを示す名前に変更されます。この設定は、ファイルの末尾に追加されます。
- 重要度: 低
- 型: STRING
- デフォルト値: .PROCESSING
- バリデーター : 一致文字列: regex( ^.*..+$ )
タイムスタンプ¶
timestamp.mode
コネクターで ConnectRecord にタイムスタンプがどのように設定されるかを決定します。
Field
を設定した場合、タイムスタンプは値のフィールドから読み取られます。このフィールドは、省略可能とすることはできません。また、タイムスタンプ であることが必要です。フィールドは、timestamp.field
で指定します。FILE_TIME
を設定した場合、ファイルの最終更新時刻が使用されます。PROCESS_TIME
(デフォルト)を設定した場合、レコードが読み取られた時刻が使用されます。- 重要度: 中
- 型: STRING
- デフォルト値: PROCESS_TIME
- 指定可能な値:
FIELD
、FILE_TIME
、PROCESS_TIME