重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

SFTP Sink Connector for Confluent Cloud

注釈

Confluent Platform 用のコネクターをローカルにインストールする場合は、「SFTP Sink Connector for Confluent Platform」を参照してください。

マネージド型 SFTP Sink Connector for Confluent Cloud を使用して、Apache Kafka® トピックから SFTP ディレクトリのファイルにデータをエクスポートできます。

SFTP Sink Connector は、Kafka からデータを定期的にポーリングし、そのデータを SFTP ファイルに書き込みます。時間ベースのパーティショナーによって、すべての Kafka パーティションのデータがチャンクに分割されます。データの各チャンクはファイルとして表されます。ファイル名で、そのデータチャンクのトピック、Kafka パーティション、開始オフセットがエンコードされます。各データチャンクのサイズは、書き込まれるレコード数およびスキーマの互換性によって決まります。

機能

SFTP Sink Connector は、次の機能をサポートしています。

  • 厳密に 1 回のデリバリー: 決定的パーティショナーを使用してエクスポートされたレコードは、"厳密に 1 回" のセマンティクスで配信されます。
  • パーティショナー: このコネクターは、Kafka クラスの TimeStamp に基づいた TimeBasedPartitioner クラスをサポートします。時間ベースのパーティショニングの選択肢は、日次または毎時です。
  • 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
  • サポートされるデータフォーマット: このコネクターは入力データフォーマットとして、Avro、JSON スキーマ(JSON-SR)、Protobuf、JSON(スキーマレス)、および Bytes をサポートします。出力フォーマットとしては Avro と JSON 出力フォーマットがサポートされます。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud SFTP Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、SFTP ディレクトリにイベントをストリーミングするようにコネクターを構成するための基本的な方法について示します。

前提条件
  • アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • SFTP ホストへのアクセス。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
  • シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the SFTP Sink connector card.

SFTP Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

Add SFTP Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: ファイルを確認します。

レコードが SFTP ホストに生成されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

Confluent CLI の使用

Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。

注釈

すべての 前提条件 を満たしていることを確認します。

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe SftpSink

出力例:

Following are the required configs:
connector.class: SftpSink
topics
input.data.format
name
kafka.api.key
kafka.api.secret
sftp.host
sftp.username
output.data.format
time.interval
rotate.schedule.interval.ms
rotate.interval.ms
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "SftpSink",
  "topics": "orders",
  "input.data.format": "JSON",
  "name": "SftpSinkConnector_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "*********************************",
  "sftp.host": "192.168.1.231",
  "sftp.username": "connect-user",
  "sftp.password:": "****************",
  "output.data.format": "JSON",
  "time.interval": "HOURLY",
  "rotate.schedule.interval.ms": "",
  "rotate.interval.ms": "",
  "tasks.max": "1",
}

以下のプロパティ定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • input.data.format:(Kafka トピックから送られるデータ)を AVRO、PROTOBUF、JSON_SR(JSON スキーマ)、JSON(スキーマレス)または BYTES を設定します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.api.key" および "kafka.api.secret": これらの認証情報として、クラスター API キーとシークレットを使用するか、サービスアカウント の API キーとシークレットを使用します。
  • "sftp.host": SFTP サーバーのホストアドレスを入力します。たとえば、192.168.1.231 のようになります。ポートのデフォルト値は 22 です。これを変更するには、プロパティ "sftp.port" を追加します。
  • "sftp.username": コネクターがホストに接続するのに使用するユーザー名を入力します。
  • "output.data.format": AVRO または JSON(スキーマレス)を入力します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。
  • "time.interval": メッセージをファイルシステムに分ける時間間隔を設定します。指定可能なオプションは、HOURLY または DAILY です。
  • "rotate.schedule.interval.ms" および "rotate.interval.ms": これらのプロパティの説明については、「構成プロパティ」を参照してください。
  • "tasks.max": このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。

注釈

path.formattopics.dir、および time.interval の各プロパティを使用して、格納するデータのディレクトリ構造を作成できます。たとえば、time.interval を HOURLY、topics.dirjson_logs/hourly、および path.format` ``'dt'=YYYY-MM-dd/'hr'=HH に設定します。この場合のディレクトリ構造は、filesystem://store-name/json_logs/hourly/<Topic-Name>/dt=2020-02-06/hr=09/<files> になります。プロパティの値と定義については、「構成プロパティ」を参照してください。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 3: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config sftp-sink-config.json

出力例:

Created connector SftpSinkConnector_0 lcc-do6vzd

ステップ 4: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID           |             Name              | Status  | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd   | SftpSinkConnector_0           | RUNNING | sink |       |

ステップ 5: ファイルを確認します。

レコードが SFTP ホストに生成されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

SFTP の詳細(SFTP Details)

sftp.host

SFTP サーバーのホストアドレス。

  • 型: string
  • 重要度: 高
sftp.port

SFTP サーバーのポート番号。

  • 型: int
  • デフォルト: 22
  • 重要度: 中
sftp.username

SFTP 接続のユーザー名。

  • 型: string
  • 重要度: 高
sftp.password

SFTP 接続のパスワード(TLS を使用する場合は不要)。

  • 型: password
  • 重要度: 高
tls.pemfile

TLS による認証時に使用される PEM ファイル。

  • 型: password
  • 重要度: 高
tls.passphrase

指定したプライベートキーが暗号化されている場合にプライベートキーの復号化に使用するパスフレーズ。

  • 型: password
  • 重要度: 高
sftp.working.dir

コネクターによる書き込み先となる最上位ディレクトリのパス(デフォルト: /home/${sftp.username})。

  • 型: string
  • デフォルト: /home/${sftp.username}
  • 重要度: 中

出力メッセージ(Output messages)

output.data.format

値の出力メッセージフォーマットを設定します。指定可能なエントリは、AVRO または JSON です。スキーマベースのメッセージフォーマット(AVRO など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。出力メッセージフォーマットのデフォルトは Input Message Format フィールドの値です。入力メッセージフォーマットとして PROTOBUF または JSON_SR が選択されている場合、ここでフォーマットを明示的に選択する必要があります。このプロパティの値が指定されていない場合、'input.data.format' プロパティに指定されている値が使用されます。

  • 型: string
  • 重要度: 高

データ編成の基準(Organize my data by...)

topics.dir

取り込んだデータが格納される最上位ディレクトリ。

  • 型: string
  • デフォルト: topics
  • 重要度: 高
path.format

TimeBasedPartitioner を使用してパーティション分割を行う場合に、この構成を使用して、データディレクトリのフォーマットを設定します。この構成で設定したフォーマットに従って、UNIX のタイムスタンプが有効なディレクトリ文字列に変換されます。filesystem://store-name/json_logs/daily/<Topic-Name>/dt=2020-02-06/hr=09/<files> のようにファイルを編成するには、topic.dir=json_logs/daily、path.format='dt'=YYYY-MM-dd/'hr'=HH、および time.interval=HOURLY プロパティを使用します。

  • 型: string
  • デフォルト: 'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
  • 重要度: 高
time.interval

ストレージに取り込まれた時間に応じた、データのパーティショニング間隔。

  • 型: string
  • 重要度: 高
rotate.schedule.interval.ms

スケジュールによるローテーションでは、rotate.schedule.interval.ms を使用して定期的にファイルを閉じ、ストレージにアップロードします。その際、レコードの時刻ではなく、現在時刻が使用されます。rotate.schedule.interval.ms の設定は非決定的であり、"厳密に 1 回" の保証は無効になります。最小値は 600000 ミリ秒(10 分)です。

  • 型: int
  • デフォルト: -1
  • 重要度: 中
rotate.interval.ms

コネクターのローテーションの間隔では、ファイルを開いてレコードを書き込みできる状態にしておく期間の最大値(ミリ秒)を指定します。つまり、rotate.interval.ms を使用する場合、各ファイルのタイムスタンプはファイルに挿入された最初のレコードのタイムスタンプから開始します。次のレコードのタイムスタンプが最初のレコードのタイムスタンプのファイルの rotate.interval の期間に収まらない場合、コネクターはファイルを閉じて Blob Storage にアップロードします。コネクターで処理するレコードがそれ以上ない場合、次のレコードを処理できるようになるまでの間、コネクターはファイルを開いたままにすることがあります(長時間になる可能性があります)。最小値は 600000 ミリ秒(10 分)です。このプロパティの値が指定されていない場合、'time.interval' プロパティに指定されている値が使用されます。

  • 型: int
  • 重要度: 高
flush.size

ファイルのコミットを呼び出す前にストレージに書き込まれるレコードの数。

  • 型: int
  • デフォルト: 1000
  • 重要度: 高
timestamp.field

TimeBasedPartitioner に使用されるタイムスタンプを含むフィールドを設定します。

  • 型: string
  • デフォルト: ""
  • 重要度: 高
timezone

TimeBasedPartitioner で使用されるタイムゾーンを設定します。

  • 型: string
  • デフォルト: UTC
  • 重要度: 高
locale

TimeBasedPartitioner で使用するロケールを設定します。

  • 型: string
  • デフォルト: en
  • 重要度: 高
compression.codec

ファイルで使用する圧縮タイプ。出力フォーマットが AVRO の場合は「deflate」、「snappy」、「bzip2」、JSON の場合は「gzip」を使用できます。

  • 型: string
  • 重要度: 高
value.converter.connect.meta.data

Connect コンバーターの有効化と無効化を切り替えて、メタデータを出力スキーマに追加するかどうかを指定します。

  • 型: boolean
  • デフォルト: true
  • 重要度: 中

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png