SFTP Source Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see SFTP Source Connector for Confluent Platform.
フルマネージド型 Kafka Connect SFTP Source Connector は、ファイルの SFTP ディレクトリをモニタリングし、新しいファイルがディレクトリに書き込まれたときにそのデータを読み取ることができます。各ファイルは input.file.parser.format
構成で使用される以下のいずれかのプロパティ値に基づいて解析されます。これらの値も UI で選択できます。
BINARY
CSV
JSON
(デフォルト)SCHEMALESS_JSON
読み取られたファイルは finished.path
ディレクトリまたは error.path
ディレクトリに配置されます。
機能¶
SFTP Source Connector は、次の機能をサポートしています。
- 少なくとも 1 回のデリバリー: コネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます(解析されたファイル行が有効な場合)。
- 1 つのタスクをサポート: コネクターは、コネクターインスタンスごとに 1 つのタスクの実行をサポートします。
- サポートされている出力データフォーマット: このコネクターは出力データフォーマットとして、Avro、JSON スキーマ(JSON-SR)、Protobuf、JSON(スキーマレス)、Bytes、String フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
構成プロパティの値と説明については、「構成プロパティ」を参照してください。
「Cloud コネクターの制限事項」も参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud SFTP Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、SFTP ホストからデータを取得するようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
- アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- SFTP ホストへのアクセス。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
- ソースコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: 接続をセットアップします。¶
注釈
1 つまたは複数のトピックを選択します。
Name にコネクター名を入力します。
Output Kafka record value で Kafka 出力レコード値のフォーマットを選択します。このコネクターは Kafka 出力レコード値のフォーマットとして、Avro、JSON スキーマ(JSON_SR)、Protobuf、JSON(スキーマレス)、Bytes、String フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
注釈
出力レコード値と次の入力ファイルパーサーフォーマットプロパティとの間には以下の関係があります。
BINARY
入力ファイルパーサー を選択する場合は、Kafka 出力レコード値フォーマットにBYTES
を選択する必要があります。SCHEMALESS_JSON
入力ファイルパーサーを選択する場合は、Kafka 出力レコード値フォーマットにSTRING
を選択する必要があります。- デフォルトの
JSON
入力ファイルパーサーを使用したり、CSV
入力ファイルパーサーを選択したりする場合は、Kafka 出力レコード値フォーマットに任意のフォーマットを使用できます。
Input file parser で、入力ファイルパーサーフォーマットを選択します。これは、SFTP ディレクトリからフェッチしたファイルの解析に使用されるパーサーフォーマットです。デフォルトは
JSON
です。重要
JSON
(デフォルト)またはCSV
を入力ファイルパーサーフォーマットとして使用する場合、Schema generation enabled をtrue
に設定する必要があります。この構成をfalse
に設定する場合、キースキーマと値スキーマを指定する必要があります。Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。サービスアカウントのリソース ID を選択するか、API キーとシークレットを入力できます(または、Cloud Console でこれらを生成します)。
SFTP Details に詳細情報を入力します。
- SFTP Host: SFTP サーバーのホストアドレスを入力します。たとえば、
192.168.1.231
のようになります。SFTP Port のデフォルト値は22
です。 - Username: コネクターがホストに接続するのに使用するユーザー名を入力します。
- Password: 入力したユーザー名のパスワードを入力します。PEM ファイルを使用してホストに対するキーベースの認証を行う場合、パスワードは不要です。
- SFTP Host: SFTP サーバーのホストアドレスを入力します。たとえば、
SFTP directory に SFTP ディレクトリの詳細を追加します。
- Input path: コネクターが正常に処理されたファイルを配置する SFTP ディレクトリ。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。
- Finished path: 処理されるファイルをコネクターが読み取る SFTP ディレクトリ。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。
- Error path: エラーがあるファイルをコネクターが配置する SFTP ディレクトリ。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。
Input file pattern (regex) を選択: 入力ファイル名を照合するための正規表現を選択します。ファイル名全体と一致する正規表現を使用してください。これは
Matcher.matches()
と同等です。.*
を使用すると、ディレクトリ内のすべてのファイルを受け入れます。Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 5: コネクターを起動します。¶
接続の詳細情報を確認し、Launch をクリックします。
ステップ 6: コネクターのステータスを確認します。¶
コネクターのステータスが Provisioning から Running に変わります。
ステップ 7: レコードを確認します。¶
レコードが Kafka のトピックに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
すべての 前提条件 を満たしていることを確認します。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe SftpSource
出力例:
Following are the required configs:
connector.class: SftpSource
name
kafka.api.key
kafka.api.secret
kafka.topic
output.data.format
sftp.host
sftp.username
input.path
finished.path
error.path
input.file.pattern
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "SftpSource",
"name": "SftpSourceConnector_0",
"kafka.api.key": "****************",
"kafka.api.secret": "*********************************",
"kafka.topic": "orders",
"output.data.format": "JSON",
"input.file.parser.format": "CSV",
"schema.generation.enable": "true",
"sftp.host": "192.168.1.231",
"sftp.username": "connect-user",
"sftp.password:": "****************",
"input.path": "/path/to/data",
"finished.path": "/path/to/finished",
"error.path": "/path/to/error",
"input.file.pattern": "csv-sftp-source.csv",
"tasks.max": "1",
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"kafka.topic"
: トピック名または複数のトピック名のコンマ区切りリストを入力します。"output.data.format"
: コネクターは Kafka 出力レコード値フォーマットとして、Avro、JSON スキーマ(JSON_SR)、Protobuf、JSON(スキーマレス)、Bytes、および String をサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。注釈
output.data.format
とinput.file.parser.format
プロパティの間には次の関係があります。BINARY
をinput.file.parser.format
に使用する場合は、BYTES
をoutput.data.format
に使用する必要があります。SCHEMALESS_JSON
をinput.file.parser.format
に使用する場合は、STRING
をoutput.data.format
に使用する必要があります。input.file.parser.format
に対して、これをJSON
(デフォルト)のままにしたりCSV
を使用する場合、output.data.format
については任意のフォーマットを使用できます。
"input.file.parser.format"
: SFTP ディレクトリからフェッチしたファイルの解析に使用されるパーサーフォーマットです。デフォルトはJSON
です。指定可能なオプションはBINARY
、CSV
、JSON
、SCHEMALESS_JSON
です。重要
JSON
(デフォルト)またはCSV
をinput.file.parser.format
として使用する場合、schema.generation.enable
プロパティを追加してこれをtrue
に設定する必要があります。このプロパティをfalse
に設定する場合、key.schema
とvalue.schema
を指定する必要があります。"sftp.host"
: SFTP サーバーのホストアドレスを入力します。たとえば、192.168.1.231
のようになります。ポートのデフォルト値は22
です。これを変更するには、プロパティ"sftp.port"
を追加します。"sftp.username"
: コネクターがホストに接続するのに使用するユーザー名を入力します。PEM ファイルを使用してホストに対するキーベースの認証を行う場合、"sftp.password"
プロパティは不要です。"input.path"
: 正常に処理されたファイルをコネクターが配置する SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。"finished.path"
: 処理されるファイルをコネクターが読み取る SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。"error.path"
: エラーがあるファイルをコネクターが配置する SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。"input.file.pattern"
: 入力ファイル名を照合するための正規表現を追加します。ファイル名全体と一致する正規表現を使用してください。Matcher.matches()
と同等です。.*
を使用すると、ディレクトリ内のすべてのファイルを受け入れます。"tasks.max"
: コネクターは、1 コネクターにつき、実行中の タスク 1 つをサポートします。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 3: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config sftp-source-config.json
出力例:
Created connector SftpSourceConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+-----------------------------+---------+--------+-------+
lcc-do6vzd | SftpSourceConnector_0 | RUNNING | source | |
ステップ 5: Kafka トピックを確認します。¶
レコードが Kafka のトピックに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
Use the following configuration properties with this connector.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- Importance: high
kafka.api.key
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
- Type: password
- Importance: high
Which topic do you want to send data to?¶
kafka.topic
Identifies the topic name to write the data to.
- Type: string
- Importance: high
Output messages¶
output.data.format
Sets the output message format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, STRING or BYTES. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- Type: string
- Importance: high
Input file parser format¶
input.file.parser.format
Parser that should be used to parse fetched files from sftp directory
- Type: string
- Default: JSON
- Importance: high
SFTP Details¶
sftp.host
Host address of the SFTP server.
- Type: string
- Importance: high
sftp.port
Port number of the SFTP server.
- Type: int
- Default: 22
- Importance: medium
sftp.username
Username for the SFTP connection.
- Type: string
- Importance: high
sftp.password
Password for the SFTP connection (not required if using TLS).
- Type: password
- Importance: high
tls.pemfile
PEM file to be used for authentication via TLS.
- Type: password
- Importance: high
tls.passphrase
Passphrase that will be used to decrypt the private key if the given private key is encrypted.
- Type: password
- Importance: high
SFTP directory¶
input.path
The SFTP directory to read files that will be processed.This directory must exist and be writable by the user running Kafka Connect.
- Type: string
- Importance: high
finished.path
The SFTP directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect.
- Type: string
- Importance: high
error.path
The SFTP directory to place files in which there are error(s). This directory must exist and be writable by the user running Kafka Connect.
- Type: string
- Importance: high
File System¶
cleanup.policy
Determines how the connector should cleanup the files that have been successfully processed. NONE leaves the files in place which could cause them to be reprocessed if the connector is restarted. DELETE removes the file from the filesystem. MOVE will move the file to a finished directory.
- Type: string
- Default: MOVE
- Importance: medium
input.file.pattern
Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches().
- Type: string
- Importance: high
behavior.on.error
Should the task halt when it encounters an error or continue to the next file.
- Type: string
- Default: FAIL
- Importance: high
file.minimum.age.ms
The amount of time in milliseconds after the file was last written to before the file can be processed. For default 0, connector processes all files irrespective of age
- Type: long
- Default: 0
- Importance: low
Connection details¶
batch.size
The number of records that should be returned with each batch.
- Type: int
- Default: 1000
- Importance: low
empty.poll.wait.ms
The amount of time to wait if a poll returns an empty list of records.
- Type: long
- Default: 250
- Importance: low
Schema¶
key.schema
The schema for the key written to Kafka.
- Type: string
- Importance: high
value.schema
The schema for the value written to Kafka.
- Type: string
- Importance: high
Schema Generation¶
schema.generation.enabled
Flag to determine if schemas should be dynamically generated. If set to true, key.schema and value.schema can be omitted, but schema.generation.key.name and schema.generation.value.name must be set.
- Type: boolean
- Importance: medium
schema.generation.key.fields
The field(s) to use to build a key schema. This is only used during schema generation.
- Type: list
- Importance: medium
schema.generation.key.name
生成されるキースキーマの名前。
- Type: string
- Importance: medium
schema.generation.value.name
生成される値スキーマの名前。
- Type: string
- Importance: medium
Timestamps¶
timestamp.mode
Determines how the connector will set the timestamp for the ConnectRecord. If set to FIELD then the timestamp will be read from a field in the value. This field cannot be optional and must be a Timestamp. Specify the field in timestamp.field. If set to FILE_TIME then the last modified time of the file will be used. If set to PROCESS_TIME the time the record is read will be used.
- Type: string
- Importance: medium
timestamp.field
The field in the value schema that will contain the parsed timestamp for the record. This field cannot be marked as optional and must be a [Timestamp] (https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html)
- Type: string
- Importance: medium
parser.timestamp.timezone
The timezone that all of the dates will be parsed with.
- Type: string
- Importance: low
parser.timestamp.date.formats
The date formats that are expected in the file. This is a list of strings that will be used to parse the date fields in order. The most accurate date format should be the first in the list. Take a look at the Java documentation for more info. https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html
- Type: list
- Importance: low
CSV Parsing¶
csv.skip.lines
Number of lines to skip in the beginning of the file.
- Type: int
- Default: 0
- Importance: low
csv.separator.char
The character that separates each field in the form of an integer. Typically in a CSV this is a ,(44) character. A TSV would use a tab(9) character. If csv.separator.char is defined as a null(0), then the RFC 4180 parser must be utilized by default. This is the equivalent of csv.rfc.4180.parser.enabled = true.
- Type: int
- Default: 44
- Importance: low
csv.quote.char
The character that is used to quote a field. Typically in a CSV this is a "(34) character. This typically happens when the csv.separator.char character is within the data.
- Type: int
- Default: 34
- Importance: low
csv.escape.char
The character as an integer to use when a special character is encountered. The default escape character is typically a (92)
- Type: int
- Default: 92
- Importance: low
csv.strict.quotes
Sets the strict quotes setting - if true, characters outside the quotes are ignored.
- Type: string
- Default: false
- Importance: low
csv.ignore.leading.whitespace
Sets the ignore leading whitespace setting - if true, white space in front of a quote in a field is ignored.
- Type: string
- Importance: low
csv.ignore.quotations
Sets the ignore quotations mode - if true, quotations are ignored.
- Type: string
- Default: false
- Importance: low
csv.keep.carriage.return
Flag to determine if the carriage return at the end of the line should be maintained.
- Type: string
- Default: false
- Importance: low
csv.null.field.indicator
Indicator to determine how the CSV Reader can determine if a field is null. Valid values are EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH, NEITHER. For more information see http://opencsv.sourceforge.net/apidocs/com/opencsv/enums/CSVReaderNullFieldIndicator.html.
- Type: string
- Default: NEITHER
- Importance: low
csv.first.row.as.header
Flag to indicate if the fist row of data contains the header of the file. If true the position of the columns will be determined by the first row to the CSV. The column position will be inferred from the position of the schema supplied in value.schema. If set to true the number of columns must be greater than or equal to the number of fields in the schema.
- Type: string
- Importance: medium
csv.file.charset
Character set to read wth file with.
- Type: string
- Default: UTF-8
- Importance: low
Number of tasks for this connector¶
tasks.max
- Type: int
- Valid Values: [1,...,1]
- Importance: high
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。