重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
SFTP Sink Connector for Confluent Cloud¶
注釈
Confluent Platform 用のコネクターをローカルにインストールする場合は、「SFTP Sink Connector for Confluent Platform」を参照してください。
マネージド型 SFTP Sink Connector for Confluent Cloud を使用して、Apache Kafka® トピックから SFTP ディレクトリのファイルにデータをエクスポートできます。
SFTP Sink Connector は、Kafka からデータを定期的にポーリングし、そのデータを SFTP ファイルに書き込みます。時間ベースのパーティショナーによって、すべての Kafka パーティションのデータがチャンクに分割されます。データの各チャンクはファイルとして表されます。ファイル名で、そのデータチャンクのトピック、Kafka パーティション、開始オフセットがエンコードされます。各データチャンクのサイズは、書き込まれるレコード数およびスキーマの互換性によって決まります。
機能¶
SFTP Sink Connector は、次の機能をサポートしています。
- 厳密に 1 回のデリバリー: 決定的パーティショナーを使用してエクスポートされたレコードは、"厳密に 1 回" のセマンティクスで配信されます。
- パーティショナー: このコネクターは、Kafka クラスの
TimeStamp
に基づいたTimeBasedPartitioner
クラスをサポートします。時間ベースのパーティショニングの選択肢は、日次または毎時です。 - 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
- サポートされるデータフォーマット: このコネクターは入力データフォーマットとして、Avro、JSON スキーマ(JSON-SR)、Protobuf、JSON(スキーマレス)、および Bytes をサポートします。出力フォーマットとしては Avro と JSON 出力フォーマットがサポートされます。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、SFTP Sink Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud SFTP Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、SFTP ディレクトリにイベントをストリーミングするようにコネクターを構成するための基本的な方法について示します。
- 前提条件
- アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- SFTP ホストへのアクセス。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
- シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: コネクターの詳細情報を入力します。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク( * )は必須項目であることを示しています。
Add SFTP Sink Connector 画面で、以下を実行します。
既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。
新しいトピックを作成するには、+Add new topic をクリックします。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。以下のいずれかのオプションを選択できます。
- Global Access: コネクターは、ユーザーがアクセス権限を持つすべての対象にアクセスできます。グローバルアクセスの場合、コネクターのアクセス権限は、ユーザーのアカウントにリンクされます。このオプションは本稼働環境では推奨されません。
- Granular access: コネクターのアクセスが制限されます。コネクターのアクセス権限は サービスアカウント から制御できます。本稼働環境にはこのオプションをお勧めします。
- Use an existing API key: 保存済みの API キーおよびシークレット部分を入力できます。API キーとシークレットを入力するか Cloud Console でこれらを生成することもできます。
- Continue をクリックします。
- SFTP Details に詳細情報を入力します。
- SFTP Host: SFTP サーバーのホストアドレスを入力します。たとえば、
192.168.1.231
のようになります。 - SFTP Port: 使用する SFTP ホストポートを入力します。ポート番号を入力しない場合は、
22
がデフォルトとして使用されます。 - Username: コネクターがホストに接続するのに使用するユーザー名を入力します。
- Password: 入力したユーザー名のパスワードを入力します。PEM ファイルを使用してホストに対するキーベースの認証を行う場合、パスワードは不要です。
- PEM File: TLS を使用する場合、SFTP ユーザーのプライベートキーが含まれる PEM ファイルをアップロードします。
- TLS passphrase: プライベートキーが暗号化されている場合、プライベートキーを復号化するためのパスフレーズを入力します。
- SFTP Host: SFTP サーバーのホストアドレスを入力します。たとえば、
- Continue をクリックします。
注釈
Cloud Console に表示されない構成プロパティでは、デフォルト値が使用されます。すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
Input Kafka record value で、Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、JSON_SR(JSON スキーマ)、PROTOBUF、JSON(スキーマレス)、または BYTES から選択します。スキーマベースのメッセージフォーマット(AVRO、JSON_SR(JSON スキーマ)、PROTOBUF など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。
Output Kafka record value で、Kafka 出力レコード値のフォーマット(コネクターから送られるデータ)を AVRO または JSON から選択します。
Time interval で、メッセージをファイルシステムに分ける時間間隔を選択します。たとえば、
Hourly
を選択すると、1 時間ごとに、メッセージがフォルダーに分けられてデータがファイルシステムにストリーミングされます。Flush size に入力します。この値のデフォルトは 1000 です。デフォルト値は増やす(専用クラスターを実行する場合は減らす)ことができます。熟練したユーザーは、以下をクリックして、コネクターがレコードを S3 にフラッシュする方法を定義できます。
Show advanced configurations
Topic directory (省略可): これは格納するデータに使用する最上位ディレクトリの名前です。使用しない場合は、
topics
がデフォルトとして使用されます。SFTP Working Directory (省略可): コネクターによる書き込み先となる最上位ディレクトリのパス。デフォルトは
/home/${sftp.username}
です。Path format (省略可): これによって、ファイルシステムに作成される時間ベースのパーティション分割のパスを構成できます。このプロパティでは、UNIX のタイムスタンプが日付フォーマットの文字列に変換されます。このプロパティを使用しない場合は、
'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
(Time interval に Hourly を選択した場合)または'year'=YYYY/'month'=MM/'day'=dd
(Time interval に Daily を選択した場合)がデフォルトとして使用されます。
注釈
Path format、Topic directory、および Time interval の各プロパティを使用して、格納するデータのディレクトリ構造を作成できます。例: Time interval を HOURLY、Topics directory を
json_logs/hourly
、および Path format を'dt'=YYYY-MM-dd/'hr'=HH
に設定します。この場合のディレクトリ構造は、filesystem://store-name/json_logs/hourly/<Topic-Name>/dt=2020-02-06/hr=09/<files>
になります。Maximum span of record time (in ms) before scheduled rotation (省略可):
rotate.schedule.interval.ms
を使用して、定期的にファイルを閉じ、ストレージにアップロードします。その際、レコードの時刻ではなく、現在時刻が使用されます。省略可能な 2 つの rotation interval プロパティの詳細については、「構成プロパティ」を参照してください。Compression Type (オプション): ファイルの圧縮タイプを選択します。AVRO 出力に
deflate
、snappy
、またはbzip2
を使用します。JSON 出力にgzip
を使用します。Maximum span of record time (in ms) before rotation (省略可): コネクターのローテーションの間隔では、ファイルを開いてレコードの書き込みができる状態にしておく期間の最大値をミリ秒で指定します。つまり、
rotate.interval.ms
を使用すると、各ファイルのタイムスタンプの起点は、ファイルに挿入された最初のレコードのタイムスタンプとなります。Timestamp field name: タイムスタンプとして使用するレコードフィールドです。これが時間ベースパーティショナーで使用されます。使用しない場合は、Kafka レコードを Kafka ブローカーが生成または格納したタイムスタンプがデフォルトとして使用されます。
Timezone: 有効なタイムゾーン を使用します。使用しない場合は、
UTC
がデフォルト設定されます。Locale: ロケールを選択します。デフォルトは
en
です。
Continue をクリックします。
選択するトピックのパーティション数に基づいて、推奨タスク数が表示されます。1 つのタスクで処理できるパーティションの数は最大 100 個です。
- 推奨されたタスク数を変更するには、Tasks フィールドに、コネクターで使用する タスク の数を入力します。
- Continue をクリックします。
注釈
このコネクターは現在、Single Message Transforms(SMT) をサポートしていません。
構成の概要を確認し、接続の詳細情報を確認します。.
Launch をクリックします。
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: ファイルを確認します。¶
レコードが SFTP ホストに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
すべての 前提条件 を満たしていることを確認します。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe SftpSink
出力例:
Following are the required configs:
connector.class: SftpSink
topics
input.data.format
name
kafka.api.key
kafka.api.secret
sftp.host
sftp.username
output.data.format
time.interval
rotate.schedule.interval.ms
rotate.interval.ms
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "SftpSink",
"topics": "orders",
"input.data.format": "JSON",
"name": "SftpSinkConnector_0",
"kafka.api.key": "****************",
"kafka.api.secret": "*********************************",
"sftp.host": "192.168.1.231",
"sftp.username": "connect-user",
"sftp.password:": "****************",
"output.data.format": "JSON",
"time.interval": "HOURLY",
"rotate.schedule.interval.ms": "",
"rotate.interval.ms": "",
"tasks.max": "1",
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。input.data.format
:(Kafka トピックから送られるデータ)を AVRO、PROTOBUF、JSON_SR(JSON スキーマ)、JSON(スキーマレス)または BYTES を設定します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。"name"
: 新しいコネクターの名前を設定します。"kafka.api.key"
および"kafka.api.secret"
: これらの認証情報として、クラスター API キーとシークレットを使用するか、サービスアカウント の API キーとシークレットを使用します。"sftp.host"
: SFTP サーバーのホストアドレスを入力します。たとえば、192.168.1.231
のようになります。ポートのデフォルト値は22
です。これを変更するには、プロパティ"sftp.port"
を追加します。"sftp.username"
: コネクターがホストに接続するのに使用するユーザー名を入力します。"output.data.format"
: AVRO または JSON(スキーマレス)を入力します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。"time.interval"
: メッセージをファイルシステムに分ける時間間隔を設定します。指定可能なオプションは、HOURLY または DAILY です。"rotate.schedule.interval.ms"
および"rotate.interval.ms"
: これらのプロパティの説明については、「構成プロパティ」を参照してください。"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。
注釈
path.format
、topics.dir
、および time.interval
の各プロパティを使用して、格納するデータのディレクトリ構造を作成できます。たとえば、time.interval
を HOURLY、topics.dir
を json_logs/hourly
、および path.format` を ``'dt'=YYYY-MM-dd/'hr'=HH
に設定します。この場合のディレクトリ構造は、filesystem://store-name/json_logs/hourly/<Topic-Name>/dt=2020-02-06/hr=09/<files>
になります。プロパティの値と定義については、「構成プロパティ」を参照してください。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 3: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config sftp-sink-config.json
出力例:
Created connector SftpSinkConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd | SftpSinkConnector_0 | RUNNING | sink | |
ステップ 5: ファイルを確認します。¶
レコードが SFTP ホストに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データの取得元とするトピック(Which topics do you want to get data from?)¶
topics
特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
- 型: list
- 重要度: 高
入力メッセージ(Input messages)¶
input.data.format
Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
SFTP の詳細(SFTP Details)¶
sftp.host
SFTP サーバーのホストアドレス。
- 型: string
- 重要度: 高
sftp.port
SFTP サーバーのポート番号。
- 型: int
- デフォルト: 22
- 重要度: 中
sftp.username
SFTP 接続のユーザー名。
- 型: string
- 重要度: 高
sftp.password
SFTP 接続のパスワード(TLS を使用する場合は不要)。
- 型: password
- 重要度: 高
tls.pemfile
TLS による認証時に使用される PEM ファイル。
- 型: password
- 重要度: 高
tls.passphrase
指定したプライベートキーが暗号化されている場合にプライベートキーの復号化に使用するパスフレーズ。
- 型: password
- 重要度: 高
sftp.working.dir
コネクターによる書き込み先となる最上位ディレクトリのパス(デフォルト: /home/${sftp.username})。
- 型: string
- デフォルト: /home/${sftp.username}
- 重要度: 中
出力メッセージ(Output messages)¶
output.data.format
値の出力メッセージフォーマットを設定します。指定可能なエントリは、AVRO または JSON です。スキーマベースのメッセージフォーマット(AVRO など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。出力メッセージフォーマットのデフォルトは Input Message Format フィールドの値です。入力メッセージフォーマットとして PROTOBUF または JSON_SR が選択されている場合、ここでフォーマットを明示的に選択する必要があります。このプロパティの値が指定されていない場合、'input.data.format' プロパティに指定されている値が使用されます。
- 型: string
- 重要度: 高
データ編成の基準(Organize my data by...)¶
topics.dir
取り込んだデータが格納される最上位ディレクトリ。
- 型: string
- デフォルト: topics
- 重要度: 高
path.format
TimeBasedPartitioner を使用してパーティション分割を行う場合に、この構成を使用して、データディレクトリのフォーマットを設定します。この構成で設定したフォーマットに従って、UNIX のタイムスタンプが有効なディレクトリ文字列に変換されます。filesystem://store-name/json_logs/daily/<Topic-Name>/dt=2020-02-06/hr=09/<files> のようにファイルを編成するには、topic.dir=json_logs/daily、path.format='dt'=YYYY-MM-dd/'hr'=HH、および time.interval=HOURLY プロパティを使用します。
- 型: string
- デフォルト: 'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
- 重要度: 高
time.interval
ストレージに取り込まれた時間に応じた、データのパーティショニング間隔。
- 型: string
- 重要度: 高
rotate.schedule.interval.ms
スケジュールによるローテーションでは、rotate.schedule.interval.ms を使用して定期的にファイルを閉じ、ストレージにアップロードします。その際、レコードの時刻ではなく、現在時刻が使用されます。rotate.schedule.interval.ms の設定は非決定的であり、"厳密に 1 回" の保証は無効になります。最小値は 600000 ミリ秒(10 分)です。
- 型: int
- デフォルト: -1
- 重要度: 中
rotate.interval.ms
コネクターのローテーションの間隔では、ファイルを開いてレコードを書き込みできる状態にしておく期間の最大値(ミリ秒)を指定します。つまり、rotate.interval.ms を使用する場合、各ファイルのタイムスタンプはファイルに挿入された最初のレコードのタイムスタンプから開始します。次のレコードのタイムスタンプが最初のレコードのタイムスタンプのファイルの rotate.interval の期間に収まらない場合、コネクターはファイルを閉じて Blob Storage にアップロードします。コネクターで処理するレコードがそれ以上ない場合、次のレコードを処理できるようになるまでの間、コネクターはファイルを開いたままにすることがあります(長時間になる可能性があります)。最小値は 600000 ミリ秒(10 分)です。このプロパティの値が指定されていない場合、'time.interval' プロパティに指定されている値が使用されます。
- 型: int
- 重要度: 高
flush.size
ファイルのコミットを呼び出す前にストレージに書き込まれるレコードの数。
- 型: int
- デフォルト: 1000
- 重要度: 高
timestamp.field
TimeBasedPartitioner に使用されるタイムスタンプを含むフィールドを設定します。
- 型: string
- デフォルト: ""
- 重要度: 高
timezone
TimeBasedPartitioner で使用されるタイムゾーンを設定します。
- 型: string
- デフォルト: UTC
- 重要度: 高
locale
TimeBasedPartitioner で使用するロケールを設定します。
- 型: string
- デフォルト: en
- 重要度: 高
compression.codec
ファイルで使用する圧縮タイプ。出力フォーマットが AVRO の場合は「deflate」、「snappy」、「bzip2」、JSON の場合は「gzip」を使用できます。
- 型: string
- 重要度: 高
value.converter.connect.meta.data
Connect コンバーターの有効化と無効化を切り替えて、メタデータを出力スキーマに追加するかどうかを指定します。
- 型: boolean
- デフォルト: true
- 重要度: 中
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。