SFTP Sink Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see SFTP Sink Connector for Confluent Platform.
マネージド型 SFTP Sink Connector for Confluent Cloud を使用して、Apache Kafka® トピックから SFTP ディレクトリのファイルにデータをエクスポートできます。
SFTP Sink Connector は、Kafka からデータを定期的にポーリングし、そのデータを SFTP ファイルに書き込みます。時間ベースのパーティショナーによって、すべての Kafka パーティションのデータがチャンクに分割されます。データの各チャンクはファイルとして表されます。ファイル名で、そのデータチャンクのトピック、Kafka パーティション、開始オフセットがエンコードされます。各データチャンクのサイズは、書き込まれるレコード数およびスキーマの互換性によって決まります。
機能¶
SFTP Sink Connector は、次の機能をサポートしています。
- 厳密に 1 回のデリバリー: 決定的パーティショナーを使用してエクスポートされたレコードは、"厳密に 1 回" のセマンティクスで配信されます。
- パーティショナー: このコネクターは、Kafka クラスの
TimeStamp
に基づいたTimeBasedPartitioner
クラスをサポートします。時間ベースのパーティショニングの選択肢は、日次または毎時です。 - 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
- サポートされるデータフォーマット: このコネクターは入力データフォーマットとして、Avro、JSON スキーマ(JSON-SR)、Protobuf、JSON(スキーマレス)、および Bytes をサポートします。出力フォーマットとしては Avro と JSON 出力フォーマットがサポートされます。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
構成プロパティの値と説明については、「構成プロパティ」を参照してください。
「Cloud コネクターの制限事項」も参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud SFTP Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、SFTP ディレクトリにイベントをストリーミングするようにコネクターを構成するための基本的な方法について示します。
- 前提条件
- アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- SFTP ホストへのアクセス。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
- シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
Step 4: Enter the connector details.¶
注釈
- Ensure you have all your prerequisites completed.
- アスタリスク( * )は必須項目であることを示しています。
At the Add SFTP Sink Connector screen, complete the following:
If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.
To create a new topic, click +Add new topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- SFTP Details に詳細情報を入力します。
- SFTP Host: SFTP サーバーのホストアドレスを入力します。たとえば、
192.168.1.231
のようになります。 - SFTP Port: 使用する SFTP ホストポートを入力します。ポート番号を入力しない場合は、
22
がデフォルトとして使用されます。 - Username: コネクターがホストに接続するのに使用するユーザー名を入力します。
- Password: 入力したユーザー名のパスワードを入力します。PEM ファイルを使用してホストに対するキーベースの認証を行う場合、パスワードは不要です。
- PEM File: TLS を使用する場合、SFTP ユーザーのプライベートキーが含まれる PEM ファイルをアップロードします。
- TLS passphrase: プライベートキーが暗号化されている場合、プライベートキーを復号化するためのパスフレーズを入力します。
- SFTP Host: SFTP サーバーのホストアドレスを入力します。たとえば、
- Click Continue.
注釈
Configuration properties that are not shown in the Cloud Console use the default values. See 構成プロパティ for all property values and descriptions.
Select the Input Kafka record value format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), PROTOBUF, JSON (schemaless), or BYTES. A valid schema must be available in Schema Registry to use a schema-based message format (for example, AVRO, JSON_SR (JSON Schema), or PROTOBUF).
Select the Output Kafka record value format (data coming from the connector): AVRO or JSON.
Select the Time interval that sets how you want your messages grouped in the file system. For example, if you select
Hourly
, messages are grouped into folders for each hour data is streamed to the file system.Enter the Flush size. This value defaults to 1000. The default value can be raised (and lowered, if running a dedicated cluster). Advanced users may define how the connector flushes records to S3 by clicking the following:
Show advanced configurations
Topic directory (Optional): This is a top-level directory name to use for stored data. Defaults to
topics
if not used.SFTP Working Directory (Optional): Path of the top level directory where the connector should write to. Defaults to
/home/${sftp.username}
.Path format (Optional): This configures the time-based partitioning path created in the file system. The property converts the UNIX timestamp to a date format string. If not used, this property defaults to
'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
if an Hourly Time interval was selected or'year'=YYYY/'month'=MM/'day'=dd
if a Daily Time interval was selected.
注釈
Path format、Topic directory、および Time interval の各プロパティを使用して、格納するデータのディレクトリ構造を作成できます。例: Time interval を HOURLY、Topics directory を
json_logs/hourly
、および Path format を'dt'=YYYY-MM-dd/'hr'=HH
に設定します。この場合のディレクトリ構造は、filesystem://store-name/json_logs/hourly/<Topic-Name>/dt=2020-02-06/hr=09/<files>
になります。Maximum span of record time (in ms) before scheduled rotation (Optional): Uses
rotate.schedule.interval.ms
to close the file and upload to storage on a regular basis using the current time, rather than the record time. See 構成プロパティ for details about the two optional rotation interval properties.Compression Type (オプション): ファイルの圧縮タイプを選択します。AVRO 出力に
deflate
、snappy
、またはbzip2
を使用します。JSON 出力にgzip
を使用します。Maximum span of record time (in ms) before rotation (Optional): The connector's rotation interval specifies the maximum timespan (in milliseconds) a file can remain open and ready for additional records. In other words, when using
rotate.interval.ms
the timestamp for each files starts with the timestamp of the first record inserted in the file.Timestamp field name: The record field used for the timestamp, which is used with the time-base partitioner. If not used, this defaults to the timestamp when the Kafka record was produced or stored by the Kafka broker.
Timezone: Use a valid timezone. Defaults to
UTC
if not used.Locale: Select a locale. Defaults to
en
.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks. One task can handle up to 100 partitions.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
注釈
This connector does not currently support Single Message Transforms (SMT).
Review the configuration summary and verify the connection details. .
Click Launch.
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: ファイルを確認します。¶
レコードが SFTP ホストに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
すべての 前提条件 を満たしていることを確認します。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe SftpSink
出力例:
Following are the required configs:
connector.class: SftpSink
topics
input.data.format
name
kafka.api.key
kafka.api.secret
sftp.host
sftp.username
output.data.format
time.interval
rotate.schedule.interval.ms
rotate.interval.ms
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "SftpSink",
"topics": "orders",
"input.data.format": "JSON",
"name": "SftpSinkConnector_0",
"kafka.api.key": "****************",
"kafka.api.secret": "*********************************",
"sftp.host": "192.168.1.231",
"sftp.username": "connect-user",
"sftp.password:": "****************",
"output.data.format": "JSON",
"time.interval": "HOURLY",
"rotate.schedule.interval.ms": "",
"rotate.interval.ms": "",
"tasks.max": "1",
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。input.data.format
:(Kafka トピックから送られるデータ)を AVRO、PROTOBUF、JSON_SR(JSON スキーマ)、JSON(スキーマレス)または BYTES を設定します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。"name"
: 新しいコネクターの名前を設定します。"kafka.api.key"
および"kafka.api.secret"
: これらの認証情報として、クラスター API キーとシークレットを使用するか、サービスアカウント の API キーとシークレットを使用します。"sftp.host"
: SFTP サーバーのホストアドレスを入力します。たとえば、192.168.1.231
のようになります。ポートのデフォルト値は22
です。これを変更するには、プロパティ"sftp.port"
を追加します。"sftp.username"
: コネクターがホストに接続するのに使用するユーザー名を入力します。"output.data.format"
: AVRO または JSON(スキーマレス)を入力します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。"time.interval"
: メッセージをファイルシステムに分ける時間間隔を設定します。指定可能なオプションは、HOURLY または DAILY です。"rotate.schedule.interval.ms"
および"rotate.interval.ms"
: これらのプロパティの説明については、「構成プロパティ」を参照してください。"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。
注釈
path.format
、topics.dir
、および time.interval
の各プロパティを使用して、格納するデータのディレクトリ構造を作成できます。たとえば、time.interval
を HOURLY、topics.dir
を json_logs/hourly
、および path.format` を ``'dt'=YYYY-MM-dd/'hr'=HH
に設定します。この場合のディレクトリ構造は、filesystem://store-name/json_logs/hourly/<Topic-Name>/dt=2020-02-06/hr=09/<files>
になります。プロパティの値と定義については、「構成プロパティ」を参照してください。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 3: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config sftp-sink-config.json
出力例:
Created connector SftpSinkConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd | SftpSinkConnector_0 | RUNNING | sink | |
ステップ 5: ファイルを確認します。¶
レコードが SFTP ホストに生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
Use the following configuration properties with this connector.
Which topics do you want to get data from?¶
topics
Identifies the topic name or a comma-separated list of topic names.
- Type: list
- Importance: high
Input messages¶
input.data.format
Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON or BYTES. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Importance: high
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- Importance: high
kafka.api.key
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
- Type: password
- Importance: high
SFTP Details¶
sftp.host
Host address of the SFTP server.
- Type: string
- Importance: high
sftp.port
Port number of the SFTP server.
- Type: int
- デフォルト: 22
- Importance: medium
sftp.username
Username for the SFTP connection.
- Type: string
- Importance: high
sftp.password
Password for the SFTP connection (not required if using TLS).
- Type: password
- Importance: high
tls.pemfile
PEM file to be used for authentication via TLS.
- Type: password
- Importance: high
tls.passphrase
Passphrase that will be used to decrypt the private key if the given private key is encrypted.
- Type: password
- Importance: high
sftp.working.dir
Path of the top level directory where the connector should write to (defaults to /home/${sftp.username}). If no value for this property is provided, the value specified for the 'sftp.username' property is used.
- Type: string
- Importance: medium
Output messages¶
output.data.format
Set the output message format for values. Valid entries are AVRO, JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO. Note that the output message format defaults to the value in the Input Message Format field. If either PROTOBUF or JSON_SR is selected as the input message format, you should select one explicitly. If no value for this property is provided, the value specified for the 'input.data.format' property is used.
- Type: string
- Importance: high
Organize my data by...¶
topics.dir
Top-level directory where ingested data is stored.
- Type: string
- デフォルト: topics
- Importance: high
path.format
This configuration is used to set the format of the data directories when partitioning with TimeBasedPartitioner. The format set in this configuration converts the Unix timestamp to a valid directory string. To organize files like this example, filesystem://store-name/json_logs/daily/<Topic-Name>/dt=2020-02-06/hr=09/<files>, use the properties: topics.dir=json_logs/daily, path.format='dt'=YYYY-MM-dd/'hr'=HH, and time.interval=HOURLY.
- Type: string
- Default: 'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
- Importance: high
time.interval
ストレージに取り込まれた時間に応じた、データのパーティショニング間隔。
- Type: string
- Importance: high
rotate.schedule.interval.ms
Scheduled rotation uses rotate.schedule.interval.ms to close the file and upload to storage on a regular basis using the current time, rather than the record time. Setting rotate.schedule.interval.ms is nondeterministic and will invalidate exactly-once guarantees. Minimum value is 600000ms (10 minutes).
- Type: int
- デフォルト: -1
- Importance: medium
rotate.interval.ms
The connector’s rotation interval specifies the maximum timespan (in milliseconds) a file can remain open and ready for additional records. In other words, when using rotate.interval.ms, the timestamp for each file starts with the timestamp of the first record inserted in the file. The connector closes and uploads a file to the blob store when the next record's timestamp does not fit into the file's rotate.interval time span from the first record's timestamp. If the connector has no more records to process, the connector may keep the file open until the connector can process another record (which can be a long time). Minimum value is 600000ms (10 minutes). If no value for this property is provided, the value specified for the 'time.interval' property is used.
- Type: int
- Importance: high
flush.size
Number of records written to storage before invoking file commits.
- Type: int
- デフォルト: 1000
- Importance: high
timestamp.field
Sets the field that contains the timestamp used for the TimeBasedPartitioner
- Type: string
- デフォルト: ""
- Importance: high
timezone
Sets the timezone used by the TimeBasedPartitioner.
- Type: string
- Default: UTC
- Importance: high
locale
Sets the locale to use with TimeBasedPartitioner.
- Type: string
- デフォルト: en
- Importance: high
compression.codec
Compression type for files. 'deflate', 'snappy' and 'bzip2' can be used when the output format is AVRO; 'gzip' can be used when the output format is JSON.
- Type: string
- Importance: high
value.converter.connect.meta.data
Toggle for enabling/disabling connect converter to add its meta data to the output schema or not.
- Type: boolean
- Default: true
- Importance: medium
Number of tasks for this connector¶
tasks.max
- Type: int
- Valid Values: [1,...]
- Importance: high
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。