重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
SFTP Source Connector for Confluent Cloud¶
注釈
Confluent Platform 用にコネクターをローカルにインストールする場合は、「SFTP Source Connector for Confluent Platform」を参照してください。
フルマネージド型 Kafka Connect SFTP Source Connector は、ファイルの SFTP ディレクトリをモニタリングし、新しいファイルがディレクトリに書き込まれたときにそのデータを読み取ることができます。各ファイルは input.file.parser.format
構成で使用される以下のいずれかのプロパティ値に基づいて解析されます。これらの値も UI で選択できます。
BINARY
CSV
JSON
(デフォルト)SCHEMALESS_JSON
読み取られたファイルは finished.path
ディレクトリまたは error.path
ディレクトリに配置されます。
機能¶
SFTP Source Connector は、次の機能をサポートしています。
- 少なくとも 1 回のデリバリー: コネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます(解析されたファイル行が有効な場合)。
- 1 つのタスクをサポート: コネクターは、コネクターインスタンスごとに 1 つのタスクの実行をサポートします。
- サポートされている出力データフォーマット: このコネクターは出力レコード値のフォーマットとして、Avro、JSON スキーマ(JSON-SR)、Protobuf、JSON(スキーマレス)、Bytes、String フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、SFTP Source Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud SFTP Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、SFTP ホストからデータを取得するようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
- アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- SFTP ホストへのアクセス。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
- ソースコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 3: コネクターを選択します。¶
Click the SFTP Source connector card.
注釈
At the Add SFTP Source Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- SFTP Details に詳細情報を入力します。
- SFTP Host: The host address for the SFTP server. For example
192.168.1.231
. - SFTP Port: The port number of the SFTP server. Defaults to
22
. - Username: The username the connector will use to connect to the host.
- Password: The password for the SFTP connection. A password is not required if you're using TLS.
- Upload a PEM file.
- TLS passphrase: Used to decrypt the private key if the given private key is encrypted.
- SFTP Host: The host address for the SFTP server. For example
- Click Continue.
Add the SFTP directory details:
- Input file parser format: The parser that should be used to fetch files from the SFTP directory.
- Output message format: The connector supports Avro, JSON Schema, Protobuf, JSON, Bytes, and String output Kafka record value formats. Schema Registry must be enabled to use a スキーマレジストリ-based format (for example, Avro, JSON Schema, or Protobuf). See スキーマレジストリ Enabled Environments for additional information.
- Input path: The SFTP directory to read files that will be processed. This directory must exist and be by the user running Kafka Connect.
- Finished path: The SFTP directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect.
- Error path: The SFTP directory to place files in which there are errors. This directory must exist and be writable by the user running Kafka Connect.
- Input file pattern (regex): A regular expression to check input
file names against. This expression must match the entire filename.
This is equivalent to
Matcher.matches()
. Using.*
accepts all files in the directory. - For Schema generation enabled select whether schemas should be
dynamically generated–select
true
orfalse
.
Show advanced configurations
- Batch size: The number of records that should be returned with
each batch.
Empty poll wait (ms): The amount of time to wait if a poll returns and empty list of records.
Cleanup policy: Determines how the connector should clean up the files that have been processed.
Behavior on error: Whether the task should halt when it encounters an error or continue to next file.
File minimum age (ms): The amount of time in milliseconds after the file was last written to before the file can be processed.
For information about transforms and predicates, see the Single Message Transforms (SMT) documentation for details. See サポートされない変換 for a list of SMTs that are not supported with this connector.
Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of tasks, use the Range Slider to select the desired number of tasks.
- Click Continue.
Verify the connection details by previewing the running configuration.
Once you've validated that the properties are configured to your satisfaction, click Launch.
コネクターのステータスが Provisioning から Running に変わります。
Step 5: Check for records.¶
レコードが Kafka のトピックに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
すべての 前提条件 を満たしていることを確認します。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe SftpSource
出力例:
Following are the required configs:
connector.class: SftpSource
name
kafka.api.key
kafka.api.secret
kafka.topic
output.data.format
sftp.host
sftp.username
input.path
finished.path
error.path
input.file.pattern
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "SftpSource",
"name": "SftpSourceConnector_0",
"kafka.api.key": "****************",
"kafka.api.secret": "*********************************",
"kafka.topic": "orders",
"output.data.format": "JSON",
"input.file.parser.format": "CSV",
"schema.generation.enable": "true",
"sftp.host": "192.168.1.231",
"sftp.username": "connect-user",
"sftp.password:": "****************",
"input.path": "/path/to/data",
"finished.path": "/path/to/finished",
"error.path": "/path/to/error",
"input.file.pattern": "csv-sftp-source.csv",
"tasks.max": "1",
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"kafka.topic"
: トピック名または複数のトピック名のコンマ区切りリストを入力します。"output.data.format"
: コネクターは Kafka 出力レコード値フォーマットとして、Avro、JSON スキーマ(JSON_SR)、Protobuf、JSON(スキーマレス)、Bytes、および String をサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。注釈
output.data.format
とinput.file.parser.format
プロパティの間には次の関係があります。BINARY
をinput.file.parser.format
に使用する場合は、BYTES
をoutput.data.format
に使用する必要があります。SCHEMALESS_JSON
をinput.file.parser.format
に使用する場合は、STRING
をoutput.data.format
に使用する必要があります。input.file.parser.format
に対して、これをJSON
(デフォルト)のままにしたりCSV
を使用する場合、output.data.format
については任意のフォーマットを使用できます。
"input.file.parser.format"
: SFTP ディレクトリからフェッチしたファイルの解析に使用されるパーサーフォーマットです。デフォルトはJSON
です。指定可能なオプションはBINARY
、CSV
、JSON
、SCHEMALESS_JSON
です。重要
JSON
(デフォルト)またはCSV
をinput.file.parser.format
として使用する場合、schema.generation.enable
プロパティを追加してこれをtrue
に設定する必要があります。このプロパティをfalse
に設定する場合、key.schema
とvalue.schema
を指定する必要があります。"sftp.host"
: SFTP サーバーのホストアドレスを入力します。たとえば、192.168.1.231
のようになります。ポートのデフォルト値は22
です。これを変更するには、プロパティ"sftp.port"
を追加します。"sftp.username"
: コネクターがホストに接続するのに使用するユーザー名を入力します。PEM ファイルを使用してホストに対するキーベースの認証を行う場合、"sftp.password"
プロパティは不要です。"input.path"
: 正常に処理されたファイルをコネクターが配置する SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。"finished.path"
: 処理されるファイルをコネクターが読み取る SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。"error.path"
: エラーがあるファイルをコネクターが配置する SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。"input.file.pattern"
: 入力ファイル名を照合するための正規表現を追加します。ファイル名全体と一致する正規表現を使用してください。Matcher.matches()
と同等です。.*
を使用すると、ディレクトリ内のすべてのファイルを受け入れます。"tasks.max"
: コネクターは、1 コネクターにつき、実行中の タスク 1 つをサポートします。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 3: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config sftp-source-config.json
出力例:
Created connector SftpSourceConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+-----------------------------+---------+--------+-------+
lcc-do6vzd | SftpSourceConnector_0 | RUNNING | source | |
ステップ 5: Kafka トピックを確認します。¶
レコードが Kafka のトピックに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
データの送信先トピック(Which topic do you want to send data to?)¶
kafka.topic
データの書き込み先トピック名を指定します。
- 型: string
- 重要度: 高
出力メッセージ(Output messages)¶
output.data.format
出力メッセージフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、STRING、BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください
- 型: string
- 重要度: 高
入力ファイルパーサーフォーマット(Input file parser format)¶
input.file.parser.format
sftp ディレクトリからフェッチしたファイルの解析に使用する必要があるパーサー。
- 型: string
- デフォルト: JSON
- 重要度: 高
SFTP の詳細(SFTP Details)¶
sftp.host
SFTP サーバーのホストアドレス。
- 型: string
- 重要度: 高
sftp.port
SFTP サーバーのポート番号。
- 型: int
- デフォルト: 22
- 重要度: 中
sftp.username
SFTP 接続のユーザー名。
- 型: string
- 重要度: 高
sftp.password
SFTP 接続のパスワード(TLS を使用する場合は不要)。
- 型: password
- 重要度: 高
tls.pemfile
TLS による認証時に使用される PEM ファイル。
- 型: password
- 重要度: 高
tls.passphrase
指定したプライベートキーが暗号化されている場合にプライベートキーの復号化に使用するパスフレーズ。
- 型: password
- 重要度: 高
SFTP ディレクトリ(SFTP directory)¶
input.path
処理対象のファイルを読み取る SFTP ディレクトリ。このディレクトリが存在し、Kafka Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 型: string
- 重要度: 高
finished.path
正常に処理されたファイルを配置する SFTP ディレクトリ。このディレクトリが存在し、Kafka Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 型: string
- 重要度: 高
error.path
エラーを含むファイルを配置する SFTP ディレクトリ。このディレクトリが存在し、Kafka Connect を実行しているユーザーによる書き込みが可能である必要があります。
- 型: string
- 重要度: 高
ファイルシステム(File System)¶
cleanup.policy
正常に処理されたファイルをコネクターでクリーンアップする方法を決定します。NONE を指定するとファイルがそのまま残されますが、コネクターが再起動された場合に再処理される原因になる可能性があります。DELETE を指定すると、ファイルシステムからファイルが削除されます。MOVE を指定すると、完了したファイル用のディレクトリにファイルが移動されます。
- 型: string
- デフォルト: MOVE
- 重要度: 中
input.file.pattern
入力ファイル名と照合する正規表現。ファイル名全体と一致する正規表現を使用してください。Matcher.matches() と同等です。
- 型: string
- 重要度: 高
behavior.on.error
エラー発生時に、タスクを中止するか、次のファイルに進みます。
- 型: string
- デフォルト: FAIL
- 重要度: 高
file.minimum.age.ms
ファイルへの最後の書き込みが終わってからファイルを処理できるようになるまでの待ち時間(ミリ秒)。デフォルトの 0 の場合、コネクターは経過時間に関係なくすべてのファイルを処理します。
- 型: long
- デフォルト: 0
- 重要度: 低
接続の詳細(Connection details)¶
batch.size
各バッチで返されるレコード数。
- 型: int
- デフォルト: 1000
- 重要度: 低
empty.poll.wait.ms
ポーリングで空のレコードリストが返された場合の待ち時間。
- 型: long
- デフォルト: 250
- 重要度: 低
スキーマ(Schema)¶
key.schema
Kafka に書き込まれるキーのスキーマ。
- 型: string
- 重要度: 高
value.schema
Kafka に書き込まれる値のスキーマ。
- 型: string
- 重要度: 高
スキーマ生成(Schema Generation)¶
schema.generation.enabled
スキーマが動的に生成されるかどうかを決定するフラグ。true を設定した場合は、key.schema と value.schema を省略できますが、schema.generation.key.name と schema.generation.value.name は設定する必要があります。
- 型: boolean
- 重要度: 中
schema.generation.key.fields
キースキーマのビルドに使用されるフィールド。これは、スキーマ生成時にのみ使用されます。
- 型: list
- 重要度: 中
schema.generation.key.name
生成されるキースキーマの名前。
- 型: string
- 重要度: 中
schema.generation.value.name
生成される値スキーマの名前。
- 型: string
- 重要度: 中
タイムスタンプ(Timestamps)¶
timestamp.mode
コネクターで ConnectRecord にタイムスタンプがどのように設定されるかを決定します。FIELD を設定した場合は、値のフィールドからタイムスタンプが読み取られます。このフィールドは、省略可能とすることはできません。また、タイムスタンプであることが必要です。フィールドは、timestamp.field で指定します。FILE_TIME を設定した場合は、ファイルの最終更新時刻が使用されます。PROCESS_TIME を設定した場合は、レコードが読み取られた時刻が使用されます。
- 型: string
- 重要度: 中
timestamp.field
値スキーマ内で、対象となるレコードの解析されたタイムスタンプを含むフィールド。このフィールドは、省略可能とすることはできません。また、[Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html)であることが必要です。
- 型: string
- 重要度: 中
parser.timestamp.timezone
すべての日付の解析で使用されるタイムゾーン。
- 型: string
- 重要度: 低
parser.timestamp.date.formats
ファイルで想定される日付フォーマット。これは、日付フィールドを順番に解析するために使用される文字列のリストです。最も正確な日付フォーマットをリストの先頭にする必要があります。詳細については、Java のドキュメント(https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html)を参照してください。
- 型: list
- 重要度: 低
CSV 解析(CSV Parsing)¶
csv.skip.lines
ファイルの先頭でスキップする行数。
- 型: int
- デフォルト: 0
- 重要度: 低
csv.separator.char
各フィールドを区切る文字を整数で指定します。CSV では通常、「,」(44)の文字です。TSV ではタブ(9)文字を使用します。csv.separator.char が null(0)として定義されている場合、デフォルトで RFC 4180 パーサーを使用する必要があります。これは、csv.rfc.4180.parser.enabled = true の設定と同等です。
- 型: int
- デフォルト: 44
- 重要度: 低
csv.quote.char
フィールドをクォートするために使用される文字。CSV では通常、「"」(34)の文字です。通常、csv.separator.char に指定する文字がデータ内にある場合に使用されます。
- 型: int
- デフォルト: 34
- 重要度: 低
csv.escape.char
特殊文字が出現した場合に使用する文字を整数で設定します。通常のデフォルトのエスケープ文字は「」(92)です。
- 型: int
- デフォルト: 92
- 重要度: 低
csv.strict.quotes
厳密なクォートの設定を指定します。true の場合、引用符の外側の文字は無視されます。
- 型: string
- デフォルト: false
- 重要度: 低
csv.ignore.leading.whitespace
先頭の空白文字を無視する設定を指定します。true の場合、フィールド内の引用符の前にある空白文字は無視されます。
- 型: string
- 重要度: 低
csv.ignore.quotations
引用符を無視するモードを設定します。true の場合引用符は無視されます。
- 型: string
- デフォルト: false
- 重要度: 低
csv.keep.carriage.return
行末のキャリッジリターンを維持するかどうかを決定するフラグ。
- 型: string
- デフォルト: false
- 重要度: 低
csv.null.field.indicator
CSV リーダーでフィールドが null かどうかを判断する方法を決定するインジケーター。指定可能な値は、EMPTY_SEPARATORS、EMPTY_QUOTES、BOTH、NEITHER です。詳細については、http://opencsv.sourceforge.net/apidocs/com/opencsv/enums/CSVReaderNullFieldIndicator.html を参照してください。
- 型: string
- デフォルト: NEITHER
- 重要度: 低
csv.first.row.as.header
データの先頭行にファイルのヘッダーが含まれているかどうかを示すフラグ。true の場合、列の配置は、CSV の先頭行によって決まります。列位置は、value.schema で指定されたスキーマの位置から推測されます。true に設定した場合、列数が、スキーマのフィールド数以上であることが必要です。
- 型: string
- 重要度: 中
csv.file.charset
ファイルを読み取るための文字セット。
- 型: string
- デフォルト: UTF-8
- 重要度: 低
ui.csv.pre.validate.file.enabled
Flag to enable validating the integrity of all records in the CSV file before processing any of its records. For example, if any of the records have a linefeed within an unquoted field, which would incorrectly break the record at that point, then the entire fil will be considered erroneous and no records from that file will be processed. The failed file would be moved to the configured error path. Important: If the number of records in a file is larger than the configured batch size, then portions of the file may be retrieved from the sftp server by the connector more than once.
- 型: string
- Default: NO
- Valid Values: NO, YES
- 重要度: 低
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...,1]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。