Kafka Connect S3

The S3 connector, currently available as a sink, allows you to export data from Kafka topics to S3 objects in either Avro or JSON formats. In addition, for certain data layouts, S3 connector exports data by guaranteeing exactly-once delivery semantics to consumers of the S3 objects it produces.

Being a sink, the S3 connector periodically polls data from Kafka and in turn uploads it to S3. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an S3 object, whose key name encodes the topic, the Kafka partition and the start offset of this data chunk. If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used. The size of each data chunk is determined by the number of records written to S3 and by schema compatibility.

Features

The S3 connector offers a variety of features:

  • Exactly Once Delivery: Records that are exported using a deterministic partitioner are delivered with exactly-once semantics regardless of the eventual consistency of S3.
  • Pluggable Data Format with or without Schema: Out of the box, the connector supports writing data to S3 in Avro and JSON format. Besides records with schema, the connector supports exporting plain JSON records without schema in text files, one record per-line. In general, the connector may accept any format that provides an implementation of the Format interface.
  • Schema Evolution: When schemas are used, the connector supports schema evolution based on schema compatibility modes. The available modes are: NONE, BACKWARD, FORWARD and FULL and a selection can be made by setting the property schema.compatibility in the connector's configuration. When the connector observes a schema change, it decides whether to roll the file or project the record to the proper schema according to the schema.compatibility configuration in use.
  • Pluggable Partitioner: The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.
  • Non-AWS Object Storage Support: Amazon S3 is an industry-standard object storage service. You can use the Kafka Connect S3 connector to connect object storage storage on non-Amazon Web Services (AWS) cloud platforms. For more information, see Using Non-AWS Storage Providers.

Install S3 Connector

Important

This connector is bundled natively with Confluent Platform. If you have Confluent Platform installed and running, there are no additional steps required to install.

If you do not have Confluent Platform installed and running, you can install the connector using the Confluent Hub client (recommended) or manually download the ZIP file.

Install Connector Using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Platform commercial features.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-s3:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-s3:5.1.0

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

This connector is available under the Confluent Community License.

Exactly-once delivery on top of eventual consistency

The S3 connector is able to provide exactly-once semantics to consumers of the objects it exports to S3, under the condition that the connector is supplied with a deterministic partitioner.

Currently, out of the available partitioners, the default and field partitioners are deterministic. This implies that, when any of these partitioners is used, splitting of files always happens at the same offsets for a given set of Kafka records. These partitioners take into account flush.size and schema.compatibility to decide when to roll and save a new file to S3. The connector always delivers files in S3 that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file does not become visible to S3. If, on the other hand, a failure occurs after the upload has completed but before the corresponding offset is committed to Kafka by the connector, then a re-upload will take place. However, such a re-upload is transparent to the user of the S3 bucket, who at any time will have access to the same records made eventually available by successful uploads to S3.

Schema Evolution

The S3 connector supports schema evolution and reacts to schema changes of data according to the schema.compatibility configuration. In this section, we will explain how the connector reacts to schema evolution under different values of schema.compatibility. The schema.compatibility can be set to NONE, BACKWARD, FORWARD and FULL, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively.

  • NO Compatibility: By default, the schema.compatibility is set to NONE. In this case, the connector ensures that each file written to S3 has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.

  • BACKWARD Compatibility: If a schema is evolved in a backward compatible way, we can always use the latest schema to query all the data uniformly. For example, removing fields is backward compatible change to a schema, since when we encounter records written with the old schema that contain these fields we can just ignore them. Adding a field with a default value is also backward compatible.

    If BACKWARD is specified in the schema.compatibility, the connector keeps track of the latest schema used in writing data to S3, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in S3.

  • FORWARD Compatibility: If a schema is evolved in a forward compatible way, we can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.

    If FORWARD is specified in the schema.compatibility, the connector projects the data to the oldest schema before writing to the same set of files in S3.

  • Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.

    If FULL is specified in the schema.compatibility, the connector performs the same action as BACKWARD.

Schema evolution in the S3 connector works in the same way as in the HDFS connector.

Automatic Retries

The S3 connector may experience problems writing to the S3 bucket, due to network partitions, interruptions, or even AWS throttling limits. In many cases, the connector will retry the request a number of times before failing. To prevent from further overloading the network or S3 service, the connector uses an exponential backoff technique to give the network and/or service time to recover. The technique adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out the retries from many tasks and should reduce the overall time required to complete all outstanding requests compared to simple exponential backoff. The goal is to spread out the requests to S3 as much as possible.

The maximum number of retry attempts is dictated by the s3.part.retries S3 connector configuration property, which defaults to 3 attempts. The delay for retries is dependent upon the connector's s3.retry.backoff.ms configuration property, which defaults to 200 milliseconds. The actual delay is randomized, but the maximum delay can be calculated as a function of the number of retry attempts with ${s3.retry.backoff.ms} * 2 ^ (retry-1), where retry is the number of attempts taken so far in the current iteration. In order to keep the maximum delay within a reasonable duration, it is capped at 24 hours. For example, the following table shows the possible wait times before submitting each of the 3 retry attempts:

Range of backoff times for each retry using the default configuration
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
1 0.0 0.2 0.2
2 0.0 0.4 0.6
3 0.0 0.8 1.4

Increasing the maximum number of retries adds more backoff:

Range of backoff times for additional retries
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
4 0.0 1.6 3.0
5 0.0 3.2 6.2
6 0.0 6.4 12.6
7 0.0 12.8 25.4
8 0.0 25.6 51.0
9 0.0 51.2 102.2
10 0.0 102.4 204.6

At some point, maximum backoff time will reach saturation and will be capped at 24 hours. From the example below, all attempts starting with 20 will have maximum backoff time as 24 hours:

Range of backoff times when reaching the cap of 24 hours
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
15 0.0 3276.8 6553.4
16 0.0 6553.6 13107.0
17 0.0 13107.2 26214.2
18 0.0 26214.4 52428.6
19 0.0 52428.8 104857.4
20 0.0 86400.0 191257.4
21 0.0 86400.0 277657.4

It's not advised to set s3.part.retries too high since making more attempts after reaching a cap of 24 hours isn't practical. You can adjust both the s3.part.retries and s3.retry.backoff.ms connector configuration properties to achieve the desired retry and backoff characteristics.

Quick Start

In this quick start, you use the S3 connector to export data produced by the Avro console producer to S3.

Before you begin, create an AWS S3 destination bucket and grant write access to the user or IAM role completing these procedures. See Setting Bucket and Object Permissions for additional information.

Next, start the services with one command using Confluent CLI:

Tip

If not already in your PATH, add Confluent's bin directory by running: export PATH=<path-to-confluent>/bin:$PATH

confluent start

Every service will start in order, printing a message with its status:

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Note

Make sure the S3 connector has write access to the S3 bucket shown in s3.bucket.name and can deploy credentials successfully. See S3 Connector Credentials for detailed information about setting up credential providers.

To import a few records with a simple schema in Kafka, start the Avro console producer as follows:

  ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Then, in the console producer, type in:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
{"f1": "value4"}
{"f1": "value5"}
{"f1": "value6"}
{"f1": "value7"}
{"f1": "value8"}
{"f1": "value9"}

The nine records entered are published to the Kafka topic s3_topic in Avro format.

Before starting the connector, please make sure that the configurations in etc/kafka-connect-s3/quickstart-s3.properties are properly set to your configurations of S3, e.g. s3.bucket.name points to your bucket, s3.region directs to your S3 region and flush.size=3 for this example. Then start the S3 connector by loading its configuration with the following command:

  confluent load s3-sink
{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "s3_topic",
    "s3.region": "us-west-2",
    "s3.bucket.name": "confluent-kafka-connect-s3-testing",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "name": "s3-sink"
  },
  "tasks": []
}

To check that the connector started successfully view the Connect worker's log by running:

confluent log connect

Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from Kafka to S3. Once the connector has ingested some records check that the data is available in S3, for instance by using AWS CLI:

aws s3api list-objects --bucket "your-bucket-name"

You should see three objects with keys:

topics/s3_topic/partition=0/s3_topic+0+0000000000.avro
topics/s3_topic/partition=0/s3_topic+0+0000000003.avro
topics/s3_topic/partition=0/s3_topic+0+0000000006.avro

Each file is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>.

To verify the contents, first copy each file from S3 to your local filesystem, for instance by running:

aws s3 cp s3://<your-bucket>/topics/s3_topic/partition=0/s3_topic+0+0000000000.avro

and use avro-tools-1.8.2.jar (available in Apache mirrors) to print the records:

java -jar avro-tools-1.8.2.jar tojson s3_topic+0+0000000000.avro

For the file above, you should see the following output:

{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

with the rest of the records contained in the other two files.

Finally, stop the Connect worker as well as all the rest of the Confluent services by running:

confluent stop

Your output should resemble:

Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

or stop all the services and additionally wipe out any data generated during this quick start by running:

confluent destroy

Your output should resemble:

Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE

S3 Connector Credentials

By default, the S3 connector looks for S3 credentials in the following locations and in the following order:

  1. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.

    export AWS_ACCESS_KEY_ID=<your_access_key_id>
    export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
    

    The AWS_ACCESS_KEY and AWS_SECRET_KEY can be used instead, but are not recognized by the AWS CLI.

  2. The aws.accessKeyId and aws.secretKey Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.

  3. The ~/.aws/credentials file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the following format:

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user's home directory. Otherwise, the S3 connector will not be able to find the credentials.

    See AWS Credentials File Format for additional details.

  4. A query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.

  5. A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.

Choose one of the above to define the AWS credentials that the S3 connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.

Note

Confluent recommends using either Environment variables or a Credentials file because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.

All S3 connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, refer to the following section to learn more about controlling and customizing how the S3 connector gets AWS credentials.

Credentials Providers

A credentials provider is a Java class that implements the com.amazon.auth.AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials from the environment. By default the S3 connector configuration property s3.credentials.provider.class uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class. This class and interface implementation chains together five other credential provider classes.

The com.amazonaws.auth.DefaultAWSCredentialsProviderChain implementation looks for credentials in the following order:

  1. Environment variables using the com.amazonaws.auth.EnvironmentVariableCredentialsProvider class implementation. This implementation uses environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Environment variables AWS_ACCESS_KEY and AWS_SECRET_KEY are also supported by this implementation; however, these two variables are only recognized by the AWS SDK for Java and are not recommended.

  2. Java system properties using the com.amazonaws.auth.SystemPropertiesCredentialsProvider class implementation. This implementation uses Java system properties aws.accessKeyId and aws.secretKey.

  3. Credentials file using the com.amazonaws.auth.profile.ProfileCredentialsProvider class implementation. This implementation uses a credentials file located in the path ~/.aws/credentials. This credentials provider can be used by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the following format:

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user's home directory. Otherwise, the S3 connector will not be able to find the credentials.

    See AWS Credentials File Format for additional details.

  4. Amazon Elastic Container Service (ECS) container credentials using the com.amazonaws.auth.ContainerCredentialsProvider class implementation. This implementation uses a query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials for the S3 connector. For this provider to work, the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI must be set. See IAM Roles for Tasks for additional information about setting up this query.

  5. EC2 instance profile credentials using the com.amazonaws.auth.InstanceProfileCredentialsProvider class implementation. EC2 instance metadata is queried for credentials. See Amazon EC2 metadata service for additional information about instance metadata queries.

    Note

    EC2 instance profile credentials can be used only if the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set. See com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper for more information.

See Working with AWS credentials for additional information and updates from AWS.

Using Other Implementations

You can use a different credentials provider. To do this, set the s3.credentials.provider.class property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider interface.

Complete the following steps to use a different credentials provider:

  1. Find or create a Java credentials provider class that implements the com.amazon.auth.AWSCredentialsProvider interface.

  2. Put the class file in a JAR file.

  3. Place the JAR file in the share/java/kafka-connect-s3 directory on all Connect workers.

  4. Restart the Connect workers.

  5. Change the S3 connector property file to use your custom credentials. Add the provider class entry s3.credentials.provider.class=<className> in the S3 connector properties file.

    Important

    You must use the fully qualified class name in the <className> entry.

See the following section for example S3 connector property file settings. For detailed descriptions for all the available configuration options of the S3 connector go to S3 Connector Configuration Options.

Example S3 Connector Property File Settings

The example settings are contained in etc/kafka-connect-s3/quickstart-s3.properties as follows:

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=s3_topic
flush.size=3

The first few settings are common to most connectors. topics specifies the topics we want to export data from, in this case s3_topic. The property flush.size specifies the number of records per partition the connector needs to write before completing a multipart upload to S3.

s3.bucket.name=confluent-kafka-connect-s3-testing
s3.part.size=5242880

The next settings are specific to Amazon S3. A mandatory setting is the name of your S3 bucket to host the exported Kafka records. Other useful settings are s3.region, which you should set if you use a region other than the default, and s3.part.size to control the size of each part in the multipart uploads that will be used to upload a single chunk of Kafka records.

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.avro.AvroFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

These class settings are required to specify the storage interface (here S3), the output file format, currently io.confluent.connect.s3.format.avro.AvroFormat or io.confluent.connect.s3.format.json.JsonFormat and the partitioner class along with its schema generator class. When using a format with no schema definition, it is sufficient to set the schema generator class to its default value.

schema.compatibility=NONE

Finally, schema evolution is disabled in this example by setting schema.compatibility to NONE, as explained above.

For detailed descriptions for all the available configuration options of the S3 connector go to S3 Connector Configuration Options.

Write raw message values into S3

It is possible to use the S3 connector to write out the unmodified original message values into newline-separated files in S3. We accomplish this by telling Connect to not deserialize any of the messages, and by configuring the S3 connector to store the message values in a binary format in S3.

The first part of our S3 connector is similar to other examples:

name=s3-raw-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=s3_topic
flush.size=3

The topics setting specifies the topics we want to export data from, in this case s3_topic. The property flush.size specifies the number of records per partition the connector needs to write before completing a multipart upload to S3.

Next we need to configure the particulars of Amazon S3:

s3.bucket.name=confluent-kafka-connect-s3-testing
s3.region=us-west-2
s3.part.size=5242880
s3.compression.type=gzip

The s3.bucket.name is mandatory and names your S3 bucket where the exported Kafka records should be written. Another useful setting is s3.region that you should set if you use a region other than the default. And since the S3 connector uses multi-part uploads, you can use the s3.part.size to control the size of each of these continuous parts used to upload Kafka records into a single S3 object. The part size affects throughput and latency, as an S3 object is visible/available only after all parts are uploaded. The s3.compression.type specifies that we want the S3 connector to compress our S3 objects using GZIP compression, adding the .gz extension to any files (see below).

So far this example configuration is relatively typical of most S3 connectors. Now lets define that we should read the raw message values and write them in binary format:

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE

The value.converter setting overrides for our connector the default that is in the Connect worker configuration, and we use the ByteArrayConverter to instruct Connect to skip deserializing the message values and instead give the connector the message values in their raw binary form. We use the format.class setting to instruct the S3 connector to write these binary message values as-is into S3 objects. By default the message values written to the same S3 object will be separated by a newline character sequence, but you can control this with the format.bytearray.separator setting, and you may want to consider this if your messages might contain newlines. Also, by default the files written to S3 will have an extension of .bin (before compression, if enabled), or you can use the format.bytearray.extension setting to change the pre-compression filename extension.

Next we need to decide how we want to partition the consumed messages in S3 objects. We have a few options, including the default partitioner that preserves the same partitions as in Kafka:

partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

Or, we could instead partition by the timestamp of the Kafka messages:

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Record

or the timestamp that the S3 connector processes each message:

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Wallclock

Custom partitioners are always an option, too. Just be aware that since the record value is an opaque binary value, we cannot extract timestamps from fields using the RecordField option.

The S3 connector configuration outlined above results in newline-delimited gzipped objects in S3 with .bin.gz.

Using Non-Amazon Web Services (AWS) Storage Providers

Many cloud providers implement an Amazon Web Services (AWS) S3-compatible API. You can use the Kafka Connect S3 connector to connect to object storage on their platform. When configuring the S3 connector for object storage on other cloud providers, include the following configuration option (if applicable for the cloud provider):

store.url

The object storage connection URL.

  • Type: string
  • Default: null
  • Importance: high

Important

Any Amazon Web Services (AWS) S3-compatible API you use must support multi-part uploads for the Kafka Connect S3 connector. See Multipart Upload Overview for more information.