Generalized Amazon S3 Source Connector for Confluent Platform

The Generalized Kafka Connect Amazon S3 Source connector can read data from any type of file naming convention listed under an S3 bucket, and the filenames don’t have to be in a specific format. As long as the files are in any of the supported formats (for example, JSON, Avro and Byte Array) the connector will be able to read them.

Features

The Generalized Amazon S3 Source connector includes the following features:

At least once delivery

In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.

Multiple tasks

The Generalized Amazon S3 Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Limitations

The Generalized S3 Source connector has the following limitations:

  • The connector is incompatible with the Connect eager rebalancing protocol.
  • The connector won’t reload data during the following scenarios:
    • Renaming a file which the connector has already read.
    • Uploading a newer version of a file with a new record.

You should also be aware of the following connector actions:

  • The connector ignores any S3 object with a name that does not start with the configured topics.dir directory. This name is topics/ by default.
  • The connector uses the connector name to store offsets on how much of the bucket it has processed. Deleting a connector and using the same name will not cause the connector to reprocess from the beginning but will save the progress of the original connector unless the corresponding entry in the offset topic is cleared.
  • For a new bucket, you need to create a new connector with an unused name. If you reconfigure an existing connector to source from the new bucket, or create a connector with a name that is used for another connector, the connector will not source from the beginning of data stored in the bucket. This is because the connector will maintain offsets tied to the connector name.

Install the Amazon S3 Source Connector

Important

  • If you’re upgrading from a previous version of the S3 Source connector, be sure to configure the connector to skip files in your buckets that have already been processed, as the partition offsets used by the Restore and Backup S3 Source connector will no longer be used by the Generalized S3 Source connector.
  • Version 2.0.0 and later of Storage Source connectors do not support CP versions earlier than version 6.0.0.

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

For full functionality, the Generalized S3 Source connector requires the following AWS permissions:

  • ListBucket
  • GetObject
  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-s3-source:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-s3-source:1.0.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent license properties for license properties and information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Amazon S3 Source Connector Configuration Properties.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

S3 Object Formats

The Generalized S3 Source connector can read different file formats in S3 and serialize them into Kafka records. This is controlled by the connector’s format.class configuration property, which has several options:

  • Avro: Use format.class=io.confluent.connect.s3.format.avro.AvroFormat to source Avro container files.
  • JSON: Use format.class=io.confluent.connect.s3.format.json.JsonFormat to source JSON files. Supported JSON formats are line-delimited JSON, record separator-limited JSON, and concatenated JSON.
  • Raw Bytes: Use format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat to parse the S3 object content as raw bytes. The default line separator will be the newline character, but this can be customized with the format.bytearray.separator configuration property.
  • Strings: Use format.class=io.confluent.connect.s3.format.string.StringFormat to parse the S3 object content as Strings.

AWS Credentials

The following sections provide information about how to configure an S3 connector to provide credentials when connecting to AWS.

Credentials provider chain

By default, the S3 connector looks for S3 credentials in the following locations and in the following order:

  1. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.

    export AWS_ACCESS_KEY_ID=<your_access_key_id>
    export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
    

    The AWS_ACCESS_KEY and AWS_SECRET_KEY can be used instead, but are not recognized by the AWS CLI.

  2. The aws.accessKeyId and aws.secretKey Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.

  3. The ~/.aws/credentials file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the format shown in the example below. See AWS Credentials File Format for additional details.

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the S3 connector will not be able to find the credentials.

  4. A query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.

  5. A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.

Choose one of the above to define the AWS credentials that the S3 connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.

Note

Confluent recommends using either Environment variables or a Credentials file because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.

All S3 connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, refer to the following section to learn more about controlling and customizing how the S3 connector gets AWS credentials.

Caution

If you configure one of the AWS key and AWS secret key implementations (as detailed above), credentials can not be supplied through the following credentials providers or by using the Trusted Account Credentials implementation. Attempting to provide credentials using multiple implementations will cause authentication failure.

Credentials providers

A credentials provider is a Java class that implements the com.amazon.auth.AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials from the environment. By default the S3 connector configuration property s3.credentials.provider.class uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class. This class and interface implementation chains together five other credential provider classes.

The com.amazonaws.auth.DefaultAWSCredentialsProviderChain implementation looks for credentials in the following order:

  1. Environment variables using the com.amazonaws.auth.EnvironmentVariableCredentialsProvider class implementation. This implementation uses environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Environment variables AWS_ACCESS_KEY and AWS_SECRET_KEY are also supported by this implementation; however, these two variables are only recognized by the AWS SDK for Java and are not recommended.

  2. Java system properties using the com.amazonaws.auth.SystemPropertiesCredentialsProvider class implementation. This implementation uses Java system properties aws.accessKeyId and aws.secretKey.

  3. Credentials file using the com.amazonaws.auth.profile.ProfileCredentialsProvider class implementation. This implementation uses a credentials file located in the path ~/.aws/credentials. This credentials provider can be used by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the format shown in the example below. See AWS Credentials File Format for additional details.

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the S3 connector will not be able to find the credentials.

  4. Amazon Elastic Container Service (ECS) container credentials using the com.amazonaws.auth.ContainerCredentialsProvider class implementation. This implementation uses a query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials for the S3 connector. For this provider to work, the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI must be set. See IAM Roles for Tasks for additional information about setting up this query.

  5. EC2 instance profile credentials using the com.amazonaws.auth.InstanceProfileCredentialsProvider class implementation. EC2 instance metadata is queried for credentials. See Amazon EC2 metadata service for additional information about instance metadata queries. See Working with AWS credentials for additional information and updates from AWS.

    Note

    EC2 instance profile credentials can be used only if the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set. See com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper for more information.

Using Trusted Account Credentials

This connector can assume a role and use credentials from a separate trusted account. This is a default feature provided with recent versions of this connector that include an updated version of the AWS SDK.

Important

You cannot use assumed role credentials to access AWS through a proxy server without first passing environment variables or system properties. This is due to an AWS SDK limitation.

After you create the trust relationship, an IAM user or an application from the trusted account can use the AWS Security Token Service (AWS STS) AssumeRole API operation. This operation provides temporary security credentials that enable access to AWS resources for the connector. For details, see Creating a Role to Delegate Permissions to an IAM User.

Example:
Profile in ~/.aws/credentials:

[default]
role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role
source_profile=staging
role_session_name = OPTIONAL_SESSION_NAME

[staging]
aws_access_key_id = <STAGING KEY>
aws_secret_access_key = <STAGING SECRET>

To allow the connector to assume a role with the right permissions, set the Amazon Resource Name (ARN) for this role. Additionally, you must choose between source_profile or credential_source as the way to get credentials that have permission to assume the role, in the environment where the connector is running.

Note

When setting up trusted account credentials, be aware that the approach of loading profiles from both ~/.aws/credentials and ~/.aws/config does not work when configuring this connector. Assumed role settings and credentials must be placed in the ~/.aws/credentials file.

Additionally, the connector implements the AwsAssumeRoleCredentialsProvider which means you can use the following configs to configure the assume role operation.

s3.credentials.provider.class=AwsAssumeRoleCredentialsProvider
sts.role.arn=arn:aws:iam::012345678901:role/my-restricted-role
sts.role.session.name=session-name
sts.role.external.id=external-id

Using Other Implementations

You can use a different credentials provider. To do this, set the s3.credentials.provider.class property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider interface.

Complete the following steps to use a different credentials provider:

  1. Find or create a Java credentials provider class that implements the com.amazon.auth.AWSCredentialsProvider interface.

  2. Put the class file in a JAR file.

  3. Place the JAR file in the share/java/kafka-connect-s3 directory on all Connect workers.

  4. Restart the Connect workers.

  5. Change the S3 connector property file to use your custom credentials. Add the provider class entry s3.credentials.provider.class=<className> in the S3 connector properties file.

    Important

    You must use the fully-qualified class name in the <className> entry.

Quick Start

In the following example, the Generalized S3 Source connector reads all data listed under a specific S3 bucket and then loads them into a Kafka topic. You may use any file naming convention writing when data to the S3 bucket.

  1. Upload the following data under a folder name quickstart within the targeted S3 bucket. In this example JSON format is used, which supports the following: line-delimited JSON, concatenated JSON, and a JSON array of records.

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    
  2. Install the connector by running the following command from your Confluent Platform installation directory:

    confluent connect plugin install confluentinc/kafka-connect-s3-source:latest
    
  3. Create a quickstart-s3source-generalized.properties file with the following contents:

    name=quick-start-s3-source
    connector.class=io.confluent.connect.s3.source.S3SourceConnector
    tasks.max=1
    value.converter=org.apache.kafka.connect.json.JsonConverter
    mode=GENERIC
    topics.dir=quickstart
    format.class=io.confluent.connect.s3.format.json.JsonFormat
    topic.regex.list=quick-start-topic:.*
    s3.bucket.name=healthcorporation
    value.converter.schemas.enable=false
    
  4. Load the Generalized S3 Source connector.

    Tip

    The syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services connect connector load quick-start-s3-source --config quickstart-s3source-generalized.properties
    
  5. Confirm the connector is in a RUNNING state:

    confluent local services connect connector status quick-start-s3-source
    
  6. Confirm that the messages are being sent to Kafka.

    kafka-console-consumer \
        --bootstrap-server localhost:9092 \
        --topic quick-start-topic \
        --from-beginning
    
  7. The response should be 9 records as shown in the following example:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

Troubleshooting Connector and Task Failures

Stack Trace

You can use the Connect Kafka Connect REST Interface to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.

Stack Trace message: No new files ready after scan task…

If this message is displayed, complete the following steps:

  1. Review your topics.dir configuration property to ensure you have configured the right folder under the S3 bucket. If you do not set this parameter, the connector expects the data to be under the default folder which is topics.
  2. Review the topic.regex configuration property to ensure your expression matches your data in the S3 bucket to the Kafka topic.