重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

SFTP Source Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「SFTP Source Connector for Confluent Platform」を参照してください。

フルマネージド型 Kafka Connect SFTP Source Connector は、ファイルの SFTP ディレクトリをモニタリングし、新しいファイルがディレクトリに書き込まれたときにそのデータを読み取ることができます。各ファイルは input.file.parser.format 構成で使用される以下のいずれかのプロパティ値に基づいて解析されます。これらの値も UI で選択できます。

  • BINARY
  • CSV
  • JSON (デフォルト)
  • SCHEMALESS_JSON

読み取られたファイルは finished.path ディレクトリまたは error.path ディレクトリに配置されます。

機能

SFTP Source Connector は、次の機能をサポートしています。

  • 少なくとも 1 回のデリバリー: コネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます(解析されたファイル行が有効な場合)。
  • 1 つのタスクをサポート: コネクターは、コネクターインスタンスごとに 1 つのタスクの実行をサポートします。
  • サポートされている出力データフォーマット: このコネクターは出力レコード値のフォーマットとして、Avro、JSON スキーマ(JSON-SR)、Protobuf、JSON(スキーマレス)、Bytes、String フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud SFTP Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、SFTP ホストからデータを取得するようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • SFTP ホストへのアクセス。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
  • ソースコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the SFTP Source connector card.

SFTP Sink Connector Card

注釈

  • すべての 前提条件 を満たしていることを確認します。
  • アスタリスク( * )は必須項目であることを示しています。
  • 次の各ステップで、必須構成プロパティの使用法に関する情報を示します。他の構成プロパティの値と説明については、「構成プロパティ」を参照してください。

At the Add SFTP Source Connector screen, complete the following:

Select the topic you want to send data to from the Topics list. To create a new topic, click +Add new topic.

Step 5: Check for records.

レコードが Kafka のトピックに生成されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

Confluent CLI の使用

Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。

注釈

すべての 前提条件 を満たしていることを確認します。

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe SftpSource

出力例:

Following are the required configs:
connector.class: SftpSource
name
kafka.api.key
kafka.api.secret
kafka.topic
output.data.format
sftp.host
sftp.username
input.path
finished.path
error.path
input.file.pattern
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "SftpSource",
  "name": "SftpSourceConnector_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "*********************************",
  "kafka.topic": "orders",
  "output.data.format": "JSON",
  "input.file.parser.format": "CSV",
  "schema.generation.enable": "true",
  "sftp.host": "192.168.1.231",
  "sftp.username": "connect-user",
  "sftp.password:": "****************",
  "input.path": "/path/to/data",
  "finished.path": "/path/to/finished",
  "error.path": "/path/to/error",
  "input.file.pattern": "csv-sftp-source.csv",
  "tasks.max": "1",
}

以下のプロパティ定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "kafka.topic": トピック名または複数のトピック名のコンマ区切りリストを入力します。

  • "output.data.format": コネクターは Kafka 出力レコード値フォーマットとして、Avro、JSON スキーマ(JSON_SR)、Protobuf、JSON(スキーマレス)、Bytes、および String をサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

    注釈

    output.data.formatinput.file.parser.format プロパティの間には次の関係があります。

    • BINARYinput.file.parser.format に使用する場合は、BYTESoutput.data.format に使用する必要があります。
    • SCHEMALESS_JSONinput.file.parser.format に使用する場合は、STRINGoutput.data.format に使用する必要があります。
    • input.file.parser.format に対して、これを JSON (デフォルト)のままにしたり CSV を使用する場合、output.data.format については任意のフォーマットを使用できます。
  • "input.file.parser.format": SFTP ディレクトリからフェッチしたファイルの解析に使用されるパーサーフォーマットです。デフォルトは JSON です。指定可能なオプションは BINARYCSVJSONSCHEMALESS_JSON です。

    重要

    JSON (デフォルト)または CSVinput.file.parser.format として使用する場合、schema.generation.enable プロパティを追加してこれを true に設定する必要があります。このプロパティを false に設定する場合、key.schemavalue.schema を指定する必要があります。

  • "sftp.host": SFTP サーバーのホストアドレスを入力します。たとえば、192.168.1.231 のようになります。ポートのデフォルト値は 22 です。これを変更するには、プロパティ "sftp.port" を追加します。

  • "sftp.username": コネクターがホストに接続するのに使用するユーザー名を入力します。PEM ファイルを使用してホストに対するキーベースの認証を行う場合、"sftp.password" プロパティは不要です。

  • "input.path": 正常に処理されたファイルをコネクターが配置する SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。

  • "finished.path": 処理されるファイルをコネクターが読み取る SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。

  • "error.path": エラーがあるファイルをコネクターが配置する SFTP ディレクトリを追加します。このディレクトリが存在し、コネクターによる書き込みが可能である必要があります。

  • "input.file.pattern": 入力ファイル名を照合するための正規表現を追加します。ファイル名全体と一致する正規表現を使用してください。Matcher.matches() と同等です。.* を使用すると、ディレクトリ内のすべてのファイルを受け入れます。

  • "tasks.max": コネクターは、1 コネクターにつき、実行中の タスク 1 つをサポートします。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 3: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config sftp-source-config.json

出力例:

Created connector SftpSourceConnector_0 lcc-do6vzd

ステップ 4: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID           |             Name            | Status  | Type   | Trace
+------------+-----------------------------+---------+--------+-------+
lcc-do6vzd   | SftpSourceConnector_0       | RUNNING | source |       |

ステップ 5: Kafka トピックを確認します。

レコードが Kafka のトピックに生成されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

データの送信先トピック(Which topic do you want to send data to?)

kafka.topic

データの書き込み先トピック名を指定します。

  • 型: string
  • 重要度: 高

出力メッセージ(Output messages)

output.data.format

出力メッセージフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、STRING、BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください

  • 型: string
  • 重要度: 高

入力ファイルパーサーフォーマット(Input file parser format)

input.file.parser.format

sftp ディレクトリからフェッチしたファイルの解析に使用する必要があるパーサー。

  • 型: string
  • デフォルト: JSON
  • 重要度: 高

SFTP の詳細(SFTP Details)

sftp.host

SFTP サーバーのホストアドレス。

  • 型: string
  • 重要度: 高
sftp.port

SFTP サーバーのポート番号。

  • 型: int
  • デフォルト: 22
  • 重要度: 中
sftp.username

SFTP 接続のユーザー名。

  • 型: string
  • 重要度: 高
sftp.password

SFTP 接続のパスワード(TLS を使用する場合は不要)。

  • 型: password
  • 重要度: 高
tls.pemfile

TLS による認証時に使用される PEM ファイル。

  • 型: password
  • 重要度: 高
tls.passphrase

指定したプライベートキーが暗号化されている場合にプライベートキーの復号化に使用するパスフレーズ。

  • 型: password
  • 重要度: 高

SFTP ディレクトリ(SFTP directory)

input.path

処理対象のファイルを読み取る SFTP ディレクトリ。このディレクトリが存在し、Kafka Connect を実行しているユーザーによる書き込みが可能である必要があります。

  • 型: string
  • 重要度: 高
finished.path

正常に処理されたファイルを配置する SFTP ディレクトリ。このディレクトリが存在し、Kafka Connect を実行しているユーザーによる書き込みが可能である必要があります。

  • 型: string
  • 重要度: 高
error.path

エラーを含むファイルを配置する SFTP ディレクトリ。このディレクトリが存在し、Kafka Connect を実行しているユーザーによる書き込みが可能である必要があります。

  • 型: string
  • 重要度: 高

ファイルシステム(File System)

cleanup.policy

正常に処理されたファイルをコネクターでクリーンアップする方法を決定します。NONE を指定するとファイルがそのまま残されますが、コネクターが再起動された場合に再処理される原因になる可能性があります。DELETE を指定すると、ファイルシステムからファイルが削除されます。MOVE を指定すると、完了したファイル用のディレクトリにファイルが移動されます。

  • 型: string
  • デフォルト: MOVE
  • 重要度: 中
input.file.pattern

入力ファイル名と照合する正規表現。ファイル名全体と一致する正規表現を使用してください。Matcher.matches() と同等です。

  • 型: string
  • 重要度: 高
behavior.on.error

エラー発生時に、タスクを中止するか、次のファイルに進みます。

  • 型: string
  • デフォルト: FAIL
  • 重要度: 高
file.minimum.age.ms

ファイルへの最後の書き込みが終わってからファイルを処理できるようになるまでの待ち時間(ミリ秒)。デフォルトの 0 の場合、コネクターは経過時間に関係なくすべてのファイルを処理します。

  • 型: long
  • デフォルト: 0
  • 重要度: 低

接続の詳細(Connection details)

batch.size

各バッチで返されるレコード数。

  • 型: int
  • デフォルト: 1000
  • 重要度: 低
empty.poll.wait.ms

ポーリングで空のレコードリストが返された場合の待ち時間。

  • 型: long
  • デフォルト: 250
  • 重要度: 低

スキーマ(Schema)

key.schema

Kafka に書き込まれるキーのスキーマ。

  • 型: string
  • 重要度: 高
value.schema

Kafka に書き込まれる値のスキーマ。

  • 型: string
  • 重要度: 高

スキーマ生成(Schema Generation)

schema.generation.enabled

スキーマが動的に生成されるかどうかを決定するフラグ。true を設定した場合は、key.schemavalue.schema を省略できますが、schema.generation.key.nameschema.generation.value.name は設定する必要があります。

  • 型: boolean
  • 重要度: 中
schema.generation.key.fields

キースキーマのビルドに使用されるフィールド。これは、スキーマ生成時にのみ使用されます。

  • 型: list
  • 重要度: 中
schema.generation.key.name

生成されるキースキーマの名前。

  • 型: string
  • 重要度: 中
schema.generation.value.name

生成される値スキーマの名前。

  • 型: string
  • 重要度: 中

タイムスタンプ(Timestamps)

timestamp.mode

コネクターで ConnectRecord にタイムスタンプがどのように設定されるかを決定します。FIELD を設定した場合は、値のフィールドからタイムスタンプが読み取られます。このフィールドは、省略可能とすることはできません。また、タイムスタンプであることが必要です。フィールドは、timestamp.field で指定します。FILE_TIME を設定した場合は、ファイルの最終更新時刻が使用されます。PROCESS_TIME を設定した場合は、レコードが読み取られた時刻が使用されます。

  • 型: string
  • 重要度: 中
timestamp.field

値スキーマ内で、対象となるレコードの解析されたタイムスタンプを含むフィールド。このフィールドは、省略可能とすることはできません。また、[Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html)であることが必要です。

  • 型: string
  • 重要度: 中
parser.timestamp.timezone

すべての日付の解析で使用されるタイムゾーン。

  • 型: string
  • 重要度: 低
parser.timestamp.date.formats

ファイルで想定される日付フォーマット。これは、日付フィールドを順番に解析するために使用される文字列のリストです。最も正確な日付フォーマットをリストの先頭にする必要があります。詳細については、Java のドキュメント(https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html)を参照してください。

  • 型: list
  • 重要度: 低

CSV 解析(CSV Parsing)

csv.skip.lines

ファイルの先頭でスキップする行数。

  • 型: int
  • デフォルト: 0
  • 重要度: 低
csv.separator.char

各フィールドを区切る文字を整数で指定します。CSV では通常、「,」(44)の文字です。TSV ではタブ(9)文字を使用します。csv.separator.char が null(0)として定義されている場合、デフォルトで RFC 4180 パーサーを使用する必要があります。これは、csv.rfc.4180.parser.enabled = true の設定と同等です。

  • 型: int
  • デフォルト: 44
  • 重要度: 低
csv.quote.char

フィールドをクォートするために使用される文字。CSV では通常、「"」(34)の文字です。通常、csv.separator.char に指定する文字がデータ内にある場合に使用されます。

  • 型: int
  • デフォルト: 34
  • 重要度: 低
csv.escape.char

特殊文字が出現した場合に使用する文字を整数で設定します。通常のデフォルトのエスケープ文字は「」(92)です。

  • 型: int
  • デフォルト: 92
  • 重要度: 低
csv.strict.quotes

厳密なクォートの設定を指定します。true の場合、引用符の外側の文字は無視されます。

  • 型: string
  • デフォルト: false
  • 重要度: 低
csv.ignore.leading.whitespace

先頭の空白文字を無視する設定を指定します。true の場合、フィールド内の引用符の前にある空白文字は無視されます。

  • 型: string
  • 重要度: 低
csv.ignore.quotations

引用符を無視するモードを設定します。true の場合引用符は無視されます。

  • 型: string
  • デフォルト: false
  • 重要度: 低
csv.keep.carriage.return

行末のキャリッジリターンを維持するかどうかを決定するフラグ。

  • 型: string
  • デフォルト: false
  • 重要度: 低
csv.null.field.indicator

CSV リーダーでフィールドが null かどうかを判断する方法を決定するインジケーター。指定可能な値は、EMPTY_SEPARATORS、EMPTY_QUOTES、BOTH、NEITHER です。詳細については、http://opencsv.sourceforge.net/apidocs/com/opencsv/enums/CSVReaderNullFieldIndicator.html を参照してください。

  • 型: string
  • デフォルト: NEITHER
  • 重要度: 低
csv.first.row.as.header

データの先頭行にファイルのヘッダーが含まれているかどうかを示すフラグ。true の場合、列の配置は、CSV の先頭行によって決まります。列位置は、value.schema で指定されたスキーマの位置から推測されます。true に設定した場合、列数が、スキーマのフィールド数以上であることが必要です。

  • 型: string
  • 重要度: 中
csv.file.charset

ファイルを読み取るための文字セット。

  • 型: string
  • デフォルト: UTF-8
  • 重要度: 低
ui.csv.pre.validate.file.enabled

Flag to enable validating the integrity of all records in the CSV file before processing any of its records. For example, if any of the records have a linefeed within an unquoted field, which would incorrectly break the record at that point, then the entire fil will be considered erroneous and no records from that file will be processed. The failed file would be moved to the configured error path. Important: If the number of records in a file is larger than the configured batch size, then portions of the file may be retrieved from the sftp server by the connector more than once.

  • 型: string
  • Default: NO
  • Valid Values: NO, YES
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...,1]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png