SFTP Sink Connector for Confluent Platform

You can use the Kafka Connect SFTP Sink connector to export data from Apache Kafka® topics to files in an SFTP directory. Supported format are CSV/TSV, Avro, JSON and Parquet.

The SFTP Sink connector periodically polls data from Kafka and in turn writes it to the SFTP files. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an object. The key name encodes the topic, the Kafka partition, and the start offset of this data chunk.

If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used. The size of each data chunk is determined by the number of records written to SFTP files and by schema compatibility.

Features

The SFTP Sink connector offers a variety of features:

  • Exactly Once Delivery: Records that are exported using a deterministic partitioner are delivered with exactly-once semantics regardless of the eventual consistency of SFTP.
  • Pluggable Data Format with or without Schema: Out of the box, the connector supports writing data to SFTP files in Avro, CSV/TSV, JSON and parquet formats. Besides records with schema, the connector supports exporting plain JSON records without schema in text files, one record per-line. In general, the connector may accept any format that provides an implementation of the Format interface.
  • Pluggable Partitioner: The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.
  • Schema Evolution: Schema evolution only works if the records are generated with the default naming strategy, which is TopicNameStrategy. An error may occur if other naming strategies are used. This is because records are not compatible with each other. schema.compatibility should be set to NONE if other naming strategies are used. This may result in small object files because the sink connector creates a new file every time the schema ID changes between records. See Subject Name Strategy for more information about naming strategies.

Quick Start

This quick start uses the SFTP Sink connector to export data produced by the Avro console producer to SFTP directory.

First, start all the necessary services using Confluent CLI.

Tip

If not already in your PATH, add Confluent’s bin directory by running: export PATH=<path-to-confluent>/bin:$PATH

confluent local start

Every service will start in order, printing a message with its status:

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Next, start the Avro console producer to import a few records to Kafka:

  ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_sftp_sink \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

In the console producer, enter the following:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

The three records entered are published to the Kafka topic test_sftp_sink in Avro format.

Before starting the connector, please make sure that the configurations in etc/kafka-connect-sftp/quickstart-sftp.properties are properly set to your configurations of SFTP (for example, sftp.hostname must point to the proper SFTP host). Then, start connector by loading its configuration with the following command:

Caution

You must include a double dash (--) between the connector name and your flag. For more information, see this post.

confluent local load sftp-sink -- -d etc/kafka-connect-sftp/quickstart-sftp.properties

{
  "name": "sftp-sink",
  "config": {
    "topics": "test_sftp_sink",
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.sftp.SftpSinkConnector",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "flush.size": "3",
    "schema.compatibility": "NONE",
    "format.class": "io.confluent.connect.sftp.sink.format.avro.AvroFormat",
    "storage.class": "io.confluent.connect.sftp.sink.storage.SftpSinkStorage",
    "sftp.host": "localhost",
    "sftp.port": "2222",
    "sftp.username": "foo",
    "sftp.password": "pass",
    "sftp.working.dir": "/share",
    "name": "sftpconnector"
  },
  "tasks": []
}

To check that the connector started successfully, view the Connect worker’s log by entering:

confluent local log connect

Towards the end of the log you should see that the connector starts, logs a few messages, and then exports data from Kafka to SFTP. Once the connector finishes ingesting data to SFTP, check that the data is available in the SFTP working directory:

You should see a file with name /topics/test_sftp_sink/partition=0/test_sftp_sink+0+0000000000.avro The file name is encoded as topic+kafkaPartition+startOffset+endOffset.format.

You can use avro-tools-1.8.2.jar (available in the metrocast.net Apache downnload mirror) to extract the content of the file.

Move avro-tools-1.8.2.jar to SFTP’s working directory and run the following command:

java -jar avro-tools-1.8.2.jar tojson /<working dir>/topics/test_sftp_sink/partition=0/test_sftp_sink+0+0000000000.avro

You should see the following output:

{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

Finally, stop the Connect worker as well as all the rest of Confluent Platform by running:

confluent local stop

Your output should resemble:

Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

Or, stop all the services and additionally wipe out any data generated during this quick start by running:

confluent local destroy

Your output should resemble:

Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE

SFTP File Names

The SFTP data model is a flat structure. Each record gets stored into a file and the name of each file serves as the unique key. Generally, the hierarchy of files in which records get stored follow this format:

<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

where:

  • <prefix> is specified with the connector’s sftp.working.dir configuration property, which defaults to the literal value topics and helps create file with a name that don’t clash with existing file’s name in the same directory.
  • <topic> corresponds to the name of the Kafka topic from which the records were read.
  • <encodedPartition> is generated by the SFTP sink connector’s partitioner (see Partitioning Records into SFTP files).
  • <kafkaPartition> is the Kafka partition number from which the records were read.
  • <startOffset> is the Kafka offset of the first record written to this file.
  • <format> is the extension identifing the format in which the records are serialized in this SFTP file.

Partitioning Records into SFTP files

The SFTP sink connector’s partitioner determines how records read from a Kafka topic are partitioned into SFTP directories. The partitioner determines the <encodedPartition> portion of the SFTP file names (see SFTP File Names).

The partitioner is specified in the connector configuration with the partitioner.class configuration property. The SFTP sink connector comes with the following partitioners:

  • Default (Kafka) Partitioner: The io.confluent.connect.storage.partitioner.DefaultPartitioner preserves the same topic partitions as in Kafka, and records from each topic partition ultimately end up in SFTP file with names that include the Kafka topic and Kafka partitions. The <encodedPartition> is always <topicName>/partition=<kafkaPartition>, resulting in SFTP file names of the form <prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>.
  • Field Partitioner: The io.confluent.connect.storage.partitioner.FieldPartitioner determines the partition from the field within each record identified by the connector’s partition.field.name configuration property, which has no default. This partitioner requires STRUCT record type values. The <encodedPartition> is always <topicName>/<fieldName>=<fieldValue>, resulting in SFTP file names of the form <prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>.
  • Time-Based Partitioner: The io.confluent.connect.storage.partitioner.TimeBasedPartitioner determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties:
    • The path.format configuration property specifies the pattern used for the <encodedPartition> portion of the SFTP file name. For example, when path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH, SFTP file names will have the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>.
    • The partition.duration.ms configuration property defines the maximum granularity of the SFTP files within a single encoded partition directory. For example, setting partition.duration.ms=600000 (10 minutes) will result in each SFTP file in that directory having no more than 10 minutes of records.
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.
  • Daily Partitioner: The io.confluent.connect.storage.partitioner.DailyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd and partition.duration.ms=86400000 (one day, for one SFTP file in each daily directory). This partitioner always results in SFTP file names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>. This partitioner requires the following connector configuration properties:
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.
  • Hourly Partitioner: The io.confluent.connect.storage.partitioner.HourlyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH and partition.duration.ms=3600000 (one hour, for one SFTP file in each hourly directory). This partitioner always results in SFTP file names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>. This partitioner requires the following connector configuration properties:
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.

As noted below, the choice of timestamp.extractor affects whether the SFTP sink connector can support exactly once delivery.

You can also choose to use a custom partitioner by implementing the io.confluent.connect.storage.partitioner.Partitioner interface, packaging your implementation into a JAR file, and then:

  1. Place the JAR file into the share/java/kafka-connect-sftp directory of your Confluent Platform installation on each worker node.
  2. Restart all of the Connect worker nodes.
  3. Configure SFTP sink connectors to use your fully-qualified partitioner class name.

SFTP File Formats

The SFTP sink connector can serialize multiple records into each SFTP file using a number of formats. The connector’s format.class configuration property identifies the name of the Java class that implements the io.confluent.connect.storage.format.Format interface. The SFTP sink connector comes with several implementations:

  • Avro: Use format.class=io.confluent.connect.sftp.format.avro.AvroFormat to write the SFTP file as an Avro container file and will include the Avro schema in the container file followed by one or more records. The connector’s avro.codec configuration property specifies the Avro compression code, and values can be null (the default) for no Avro compression, deflate to use the deflate algorithm as specified in RFC 1951, snappy to use Google’s Snappy compression library, and bzip2 for BZip2 compression. Optionally set enhanced.avro.schema.support=true to enable enum symbol preservation and package name awareness.
  • JSON: Use format.class=io.confluent.connect.sftp.format.json.JsonFormat to write the SFTP file as a single JSON array containing a JSON object for each record. The connector’s sftp.compression.type configuration property can be set to none (the default) for no compression or gzip for GZip compression.
  • Parquet: Use format.class=io.confluent.connect.sftp.format.parquet.ParquetFormat to write the SFTP file as an parquet container file and will include the Avro schema in the container file followed by one or more records.
  • CSV: Use format.class=io.confluent.connect.sftp.format.csv.CsvFormat to write the SFTP file containing a comma seperated line for each record.
  • TSV: Use format.class=io.confluent.connect.sftp.format.tsv.TsvFormat to write the SFTP file containing a tab space seperated line for each record.

You can also choose to use a custom partitioner by implementing the io.confluent.connect.storage.format.Format interface, packaging your implementation into a JAR file, and then:

  1. Place the JAR file into the share/java/kafka-connect-sftp directory of your Confluent Platform installation on each worker node.
  2. Restart all of the Connect worker nodes.
  3. Configure SFTP sink connectors with format.class set to the fully-qualified class name of your format implementation.