Backup and Restore Amazon S3 Source Connector for Confluent Platform

The Backup and Restore Kafka Connect Amazon S3 Source connector reads data exported to S3 by the Amazon S3 Sink connector and publishes it back to an Apache Kafka® topic. Depending on the format and partitioner used to write the data to S3, this connector can write to the destination topic using the same partitions as the original messages exported to S3 and maintain the same message order. The connector selects folders based on the partitioner configuration and reads each folders S3 objects in alphabetical order. Each record is read based on the format selected. Configuration is setup to mirror the Kafka Amazon S3 Sink connector and should be possible to make only minor changes to the original sink configuration.

Important

The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector. If the topics do not exist, Connect relies on auto topic creation and the number of partitions are based upon the Kafka broker defaults. If there are more partitions in the destination cluster, the extra partitions are not used. If there are fewer partitions in the destination cluster, the connector task throws an exception and stops the moment it tries to write to a Kafka partition that does not exist.

Be aware of the following connector actions:

  • The connector ignores any S3 object with a name that does not start with the configured topics directory. This name is “/topics/” by default.
  • The connector ignores any S3 object that is below the topics directory but has an extension that does not match the configured format. For example, a JSON file is ignored when format.class is set for Avro files.
  • The connector stops and fails if the S3 object’s name does not match the expected format or is in an unexpected location.
  • The number of tasks the connector can spawn is bound by the number of source partitions and the tasks.max value. If there are fewer source partitions than the tasks.max value, the connector stops spawning tasks after reaching the number of source partitions, even if the value of tasks.mask is greater.

Avoid the following configuration issues:

  • A file with the correct extension and a valid name format e.g. <topic>+<partition>+<offset>.<extension>, placed in a folder of a different topic will be read normally and written to whatever topic as defined by its filename.
  • If a field partitioner is incorrectly configured to match the expected folder, it can break the ordering guarantees of S3 sink that used a deterministic sink partitioner.

Caution

There is a risk with using the S3 connector and the ByteArrayFormat and ByteArrayConverter for data containing delimiter characters internally (newlines by default). The risk is that these records may be read back (sourced) incorrectly. For example: You have data that starts out in Avro format. You use the ByteArrayFormat and ByteArrayConverter to sink the data (containing delimiter characters) to S3 storage using the S3 Sink connector. When you later source the data to a Kafka topic using the S3 Source connector, the original Avro data may be corrupted and potentially unretrievable.

Features

The Backup and Restore S3 Source connector includes the following features:

At least once delivery

In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.

Multiple tasks

The Backup and Restore Amazon S3 Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Pluggable data format with or without schema

Out of the box, the connector supports reading data from S3 in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.

Non-AWS object storage support

Amazon S3 is an industry-standard object storage service. You can use the Kafka Connect Backup and Restore S3 connector to connect object storage storage on non-AWS cloud platforms by using a different store URL to point at this alternative cloud platform.

Matching source partitioning

Messages will be put back on to the same Kafka partition for that topic when it was written.

Source partition ordering

The connector will read records back in time order in each topic-source partition if the DefaultPartitioner or a TimeBasedPartitioner is used. If a FieldPartitioner is used it isn’t possible to guarantee the order of these messages.

Pluggable partitioner

The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.

Tip

By default, connectors inherit the partitioner used for the Kafka topic. You can create a custom partitioner for a connector which you must place in the connector’s /lib folder.

You can also put partitioners in a common location of choice. If you choose this option, you must add a symlink to the location from each connector’s /lib folder. For example, you would place a custom partitioner in the path share/confluent-hub-components/partitioners and then add the symlink share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners.

Limitations

The Backup and Restore Amazon S3 Source connector has the following limitations:

  • The connector does not read (or restore) keys and headers exported by the Amazon S3 Sink connector.

  • The connector does not support the s3.proxy.url configuration property.

  • Partitioners recognize new topic folders based on polling intervals and task configuration updates. The DefaultPartitioner detects new partition folders. The FieldPartitioner notices new folders for the fields specified using partition.field.name. However, the TimeBasedPartitioner does not detect new files for a new time period.

    It is important to note that for the TimeBasedPartitioner, the capacity to scale the connector across various time ranges is limited in Backup and Restore mode. Currently, the connector does not support processing data that spans several years.

  • A continuous feedback loop is created if both the Amazon S3 Sink connector and the source connector are using the same Kafka cluster. This means the source connector is writing to the same topic being consumed by the sink connector. The resulting continuous feedback loop creates an ever-increasing number of duplicate Kafka records and S3 objects. It is possible to avoid the feedback loop by writing to a different topic than the one being consumed by the sink connector. You can use the RegexRouter SMT with the source connector to change the names of the topics where the records are written. Or, you can use the ExtractTopic SMT with the source connector to change the topic name based on a message field name.

    Note that the S3 Source connector fails when attempting to use the Filter SMT. This is due to an issue with the Connect framework (ce-kafka) where the connector stops sourcing further files if all records in a file are filtered through SMT.

  • The S3 Sink connector can compress files before pushing them to the S3 bucket. However, the S3 Source connector cannot read compressed files from the S3 bucket. Be sure to leave files uncompressed if you are storing data that will later be pulled from storage by the S3 source connector.

  • Source data written to Confluent Platform and Confluent Cloud by the S3 Source connector must be in the original format written by the S3 Sink connector.

Install the Backup and Restore Amazon S3 Source Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-s3-source:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-s3-source:1.0.0-preview
    

You can install a specific version by replacing latest with a version number. For example:

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent license properties for license properties and information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Amazon S3 Source Connector Configuration Properties.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

AWS Credentials

The following sections provide information about how to configure an S3 connector to provide credentials when connecting to AWS.

Credentials provider chain

By default, the S3 connector looks for S3 credentials in the following locations and in the following order:

  1. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.

    export AWS_ACCESS_KEY_ID=<your_access_key_id>
    export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
    

    The AWS_ACCESS_KEY and AWS_SECRET_KEY can be used instead, but are not recognized by the AWS CLI.

  2. The aws.accessKeyId and aws.secretKey Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.

  3. The ~/.aws/credentials file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the format shown in the example below. See AWS Credentials File Format for additional details.

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the S3 connector will not be able to find the credentials.

  4. A query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.

  5. A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.

Choose one of the above to define the AWS credentials that the S3 connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.

Note

Confluent recommends using either Environment variables or a Credentials file because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.

All S3 connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, refer to the following section to learn more about controlling and customizing how the S3 connector gets AWS credentials.

Caution

If you configure one of the AWS key and AWS secret key implementations (as detailed above), credentials can not be supplied through the following credentials providers or by using the Trusted Account Credentials implementation. Attempting to provide credentials using multiple implementations will cause authentication failure.

Credentials providers

A credentials provider is a Java class that implements the com.amazon.auth.AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials from the environment. By default the S3 connector configuration property s3.credentials.provider.class uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class. This class and interface implementation chains together five other credential provider classes.

The com.amazonaws.auth.DefaultAWSCredentialsProviderChain implementation looks for credentials in the following order:

  1. Environment variables using the com.amazonaws.auth.EnvironmentVariableCredentialsProvider class implementation. This implementation uses environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Environment variables AWS_ACCESS_KEY and AWS_SECRET_KEY are also supported by this implementation; however, these two variables are only recognized by the AWS SDK for Java and are not recommended.

  2. Java system properties using the com.amazonaws.auth.SystemPropertiesCredentialsProvider class implementation. This implementation uses Java system properties aws.accessKeyId and aws.secretKey.

  3. Credentials file using the com.amazonaws.auth.profile.ProfileCredentialsProvider class implementation. This implementation uses a credentials file located in the path ~/.aws/credentials. This credentials provider can be used by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the format shown in the example below. See AWS Credentials File Format for additional details.

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the S3 connector will not be able to find the credentials.

  4. Amazon Elastic Container Service (ECS) container credentials using the com.amazonaws.auth.ContainerCredentialsProvider class implementation. This implementation uses a query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials for the S3 connector. For this provider to work, the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI must be set. See IAM Roles for Tasks for additional information about setting up this query.

  5. EC2 instance profile credentials using the com.amazonaws.auth.InstanceProfileCredentialsProvider class implementation. EC2 instance metadata is queried for credentials. See Amazon EC2 metadata service for additional information about instance metadata queries. See Working with AWS credentials for additional information and updates from AWS.

    Note

    EC2 instance profile credentials can be used only if the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set. See com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper for more information.

Using Trusted Account Credentials

This connector can assume a role and use credentials from a separate trusted account. This is a default feature provided with recent versions of this connector that include an updated version of the AWS SDK.

Important

You cannot use assumed role credentials to access AWS through a proxy server without first passing environment variables or system properties. This is due to an AWS SDK limitation.

After you create the trust relationship, an IAM user or an application from the trusted account can use the AWS Security Token Service (AWS STS) AssumeRole API operation. This operation provides temporary security credentials that enable access to AWS resources for the connector. For details, see Creating a Role to Delegate Permissions to an IAM User.

Example:
Profile in ~/.aws/credentials:

[default]
role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role
source_profile=staging
role_session_name = OPTIONAL_SESSION_NAME

[staging]
aws_access_key_id = <STAGING KEY>
aws_secret_access_key = <STAGING SECRET>

To allow the connector to assume a role with the right permissions, set the Amazon Resource Name (ARN) for this role. Additionally, you must choose between source_profile or credential_source as the way to get credentials that have permission to assume the role, in the environment where the connector is running.

Note

When setting up trusted account credentials, be aware that the approach of loading profiles from both ~/.aws/credentials and ~/.aws/config does not work when configuring this connector. Assumed role settings and credentials must be placed in the ~/.aws/credentials file.

Additionally, the connector implements the AwsAssumeRoleCredentialsProvider which means you can use the following configs to configure the assume role operation.

s3.credentials.provider.class=AwsAssumeRoleCredentialsProvider
sts.role.arn=arn:aws:iam::012345678901:role/my-restricted-role
sts.role.session.name=session-name
sts.role.external.id=external-id

Using Other Implementations

You can use a different credentials provider. To do this, set the s3.credentials.provider.class property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider interface.

Complete the following steps to use a different credentials provider:

  1. Find or create a Java credentials provider class that implements the com.amazon.auth.AWSCredentialsProvider interface.

  2. Put the class file in a JAR file.

  3. Place the JAR file in the share/java/kafka-connect-s3 directory on all Connect workers.

  4. Restart the Connect workers.

  5. Change the S3 connector property file to use your custom credentials. Add the provider class entry s3.credentials.provider.class=<className> in the S3 connector properties file.

    Important

    You must use the fully-qualified class name in the <className> entry.

Quick Start

The following uses the S3SinkConnector to write a file from the Kafka topic named s3_topic to S3. Then, the S3SourceConnector loads that Avro file from S3 to the Kafka topic named copy_of_s3_topic.

  1. Follow the instructions from the S3 Sink connector quick start to set up the data to use below.

  2. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent connect plugin install confluentinc/kafka-connect-s3-source:latest
    

    Tip

    By default, the connector will install the plugin into the share/confluent-hub-components directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker.

  3. Create a quickstart-s3source.properties file with the following contents or use the quickstart-s3source.properties.:

    name=s3-source
    tasks.max=1
    connector.class=io.confluent.connect.s3.source.S3SourceConnector
    s3.bucket.name=confluent-kafka-connect-s3-testing
    format.class=io.confluent.connect.s3.format.avro.AvroFormat
    confluent.license=
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    

    Tip

    The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses. replication.factor may not be larger than the number of Kafka brokers in the destination cluster, so here we set this to ‘1’ for demonstration purposes. Always use at least ‘3’ in production configurations.

  4. Edit the quickstart-s3source.properties to add the following properties:

    transforms=AddPrefix
    transforms.AddPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
    transforms.AddPrefix.regex=.*
    transforms.AddPrefix.replacement=copy_of_$0
    

    Important

    Adding this renames the output of topic of the messages to copy_of_s3_topic. This prevents a continuous feedback loop of messages.

  5. Load the Backup and Restore S3 Source connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services connect connector load s3-source --config quickstart-s3source.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

  6. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status s3-source
    
  7. Confirm that the messages are being sent to Kafka.

    kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 \
        --property schema.registry.url=http://localhost:8081 \
        --topic copy_of_s3_topic \
        --from-beginning | jq '.'
    
  8. The response should be 18 records as follows.

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

Troubleshooting Connector and Task Failures

Stack Trace

You can use the Connect Kafka Connect REST Interface to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.

Stack Trace message: No new files ready after scan task…

The Kafka Connect Amazon S3 Source connector only reads data exported to an S3 bucket by the Kafka Connect Amazon S3 Sink connector. The connector always ignores a file which is not in <topic>+<partition>+<offset>.<extension> format. This message occurs when the files in the S3 directory do not match the pattern of <topic><partition><offset>.<extension> format expected by the source connector.

Fewer Partitions in Destination Cluster

If there are fewer partitions in the destination cluster than in the source topic, the connector task throws an exception and immediately stops when it tries to write to a Kafka partition that does not exist. You will see the following error messages in the Connect worker log. The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector.

INFO WorkerSourceTask{id=s3-source-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:409)
INFO WorkerSourceTask{id=s3-source-0} flushing 1 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:426)
ERROR WorkerSourceTask{id=s3-source-0} Failed to flush, timed out while waiting
for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
ERROR WorkerSourceTask{id=s3-source-0} Failed to commit offsets
(org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

Error Handling

The following behavior.on.error configuration properties set how the connector handles errors.

  • fail : The connector stops processing when an error occurs. The full batch of records will not be sent to Kafka if any record in the batch is corrupted.
  • ignore : The corrupted record is ignored. The connector continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.
  • log : Logs an error message and continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.

Note

The connector always ignores a file which is not in <topic>+<partition>+<offset>.<extension> format.