Amazon SQS Source Connector for Confluent Platform

The Kafka Connect Simple Queue Service (SQS) Source connector is used to move messages from AWS SQS Queues into Apache Kafka®. It supports both Standard and FIFO queues.

This connector polls an SQS queue, converts SQS messages into Kafka records, and pushes the records into a Kafka topic.

Each SQS message is converted into exactly one Kafka record, with the following structure:

  1. The key encodes the SQS queue name and message ID in a struct. For FIFO queues, it also includes the message group ID.
  2. The value encodes the body of the SQS message and various message attributes in a struct. See the Record schema section.
  3. Each header encodes the message attributes that may be present in SQS message.

The schema for key, value and headers is described in the Record schema section.

For standard queues, this connector supports best-effort ordering guarantees. This means that there is a chance records will end up in a different order in Kafka. Also, the connector supports at least once delivery guarantees. This means there is a chance that the connector can introduce duplicate records in Kafka.

For FIFO queues, this connector guarantees records are inserted into Kafka in the order they were inserted in SQS, as long as the destination Kafka topic has exactly 1 partition. If the destination topic has more than 1 partitions, you can write a single message transform to set the partition based on the MessageGroupId field in the key.

The SQS Source connector sends delete requests once the SQS messages are processed and stored in the Kafka topics.

Features

The SQS Source connector includes the following features:

At least once delivery

The connector guarantees that messages from SQS are delivered at least once to the Kafka topic.

Multiple tasks

The SQS Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Automatic retries

The SQS Source connector may experience problems connecting to the SQS endpoint due to network issues or in the rare occasion due to an AWS outage. The connector will automatically retry polling SQS. The property sqs.max.retries controls how many times retries will be attempted.

The connector internally uses AWS Java SDK. The AWS Java SDK uses an exponential backoff strategy, such that there is an increasing delay between subsequent retries. For more information, refer to Error Retries and Exponential Backoff in AWS.

Supports HTTPS proxy

The connector can connect to SQS using an https proxy server. To configure the proxy, you can set sqs.proxy.url, sqs.proxy.user and sqs.proxy.password in the configuration file.

Long polling

This connector uses long polling to retrieve messages from SQS. By default, it will wait up to 20 seconds for a message to be available in SQS. You can change sqs.waittime.seconds to a number between 1s and 20s.

Fetch multiple messages

In every poll cycle, the connector fetches sqs.messages.max number of messages. By default, this value is 10, which is the maximum. This setting works well for most use cases. However, if your message size is exceptionally large, you may want to reduce this to a lower number.

Install the SQS Source connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • Java 8+. Note that Java 8 is deprecated in versions 7.2 and later of Confluent Platform. For more details, view Java compatibility with Confluent Platform by version.
  • An installation of the Confluent Hub Client. Note that this is installed by default with Confluent Enterprise.
  • An installation of the latest (latest) connector version.

Install the connector using the Confluent CLI

To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-sqs:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-sqs:2.1.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, refer to License topic configuration.

Configuration properties

For a complete list of configuration properties for this connector, see Configuration Reference for Amazon SQS Source Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick start

In this quick start guide, you use the SQS Source connector to export messages from an SQS FIFO queue to a Kafka topic. Before running the quick start, ensure the following:

  • Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI, but standalone installations are also supported. By default ZooKeeper, Kafka, Schema Registry, Connect, and the Connect REST API are started with the confluent local services start command. For more information, see Quick Start for Apache Kafka using Confluent Platform (Local). Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.
  • You must install the AWS CLI and configure it by running the aws configure command.
  • Ensure the IAM user or role you configure has full access to SQS.
  • Create a Kafka topic called sqs-quickstart.
  1. Create a FIFO queue by running the following command:

    aws sqs create-queue --queue-name sqs-source-connector-demo
    

    You should see output similar to the following:

    {
      "QueueUrl": "https://queue.amazonaws.com/940887362971/sqs-source-connector-demo"
    }
    
  2. Add some records to the newly created queue by first creating a file called send-message-batch.json with the following content:

    [
      {
          "Id":"FuelReport-0001-2015-09-16T140731Z",
          "MessageBody":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM.",
          "DelaySeconds":10,
          "MessageAttributes":{
            "SellerName":{
                "DataType":"String",
                "StringValue":"Example Store"
            },
            "City":{
                "DataType":"String",
                "StringValue":"Any City"
            },
            "Region":{
                "DataType":"String",
                "StringValue":"WA"
            },
            "PostalCode":{
                "DataType":"String",
                "StringValue":"99065"
            },
            "PricePerGallon":{
                "DataType":"Number",
                "StringValue":"1.99"
            }
          }
      },
      {
          "Id":"FuelReport-0002-2015-09-16T140930Z",
          "MessageBody":"Fuel report for account 0002 on 2015-09-16 at 02:09:30 PM.",
          "DelaySeconds":10,
          "MessageAttributes":{
            "SellerName":{
                "DataType":"String",
                "StringValue":"Example Fuels"
            },
            "City":{
                "DataType":"String",
                "StringValue":"North Town"
            },
            "Region":{
                "DataType":"String",
                "StringValue":"WA"
            },
            "PostalCode":{
                "DataType":"String",
                "StringValue":"99123"
            },
            "PricePerGallon":{
                "DataType":"Number",
                "StringValue":"1.87"
            }
          }
      }
    ]
    
  3. Add the records to the queue by running the following command:

    aws sqs send-message-batch --queue-url https://queue.amazonaws.com/940887362971/sqs-source-connector-demo --entries file://send-message-batch.json``
    
  4. Load the SQS Source connector. Note that you must ensure the sqs.url configuration parameter points to the correct SQS URL. The sqs.url parameter format is:

    sqs.url=https://sqs.<region-code>.amazonaws.com/<account_no>/<topic-name.fifo>.

    For example, if the AWS CLI returns the queue URL: https://eu-central-1.queue.amazonaws.com/829250931565/sqs-source-connector-demo, the sqs.url for the SQS Source connector is https://sqs.eu-central-1.amazonaws.com/829250931565/sqs-source-connector-demo.

    confluent local services connect connector load sqs-source
    

    Your output should resemble:

    {
      "name": "sqs-source",
      "config": {
        "connector.class": "io.confluent.connect.sqs.source.SqsSourceConnector",
        "tasks.max": "1",
        "kafka.topic": "test-sqs-source",
        "sqs.url": "https://sqs.us-east-1.amazonaws.com/942288736285822/sqs-fifo-queue.fifo",
        "name": "sqs-source"
      },
      "tasks": [],
      "type": null
    }
    
  5. After the connector finishes ingesting data to Kafka, check that the data is available in the Kafka topic.

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-sqs-source --from-beginning
    

    You should see two records, similar to the following:

    {
      "schema":{
          "type":"struct",
          "fields":[
            {
                "type":"int64",
                "optional":false,
                "field":"ApproximateFirstReceiveTimestamp"
            },
            {
                "type":"int32",
                "optional":false,
                "field":"ApproximateReceiveCount"
            },
            {
                "type":"string",
                "optional":false,
                "field":"SenderId"
            },
            {
                "type":"int64",
                "optional":false,
                "field":"SentTimestamp"
            },
            {
                "type":"string",
                "optional":true,
                "field":"MessageDeduplicationId"
            },
            {
                "type":"string",
                "optional":true,
                "field":"MessageGroupId"
            },
            {
                "type":"string",
                "optional":true,
                "field":"SequenceNumber"
            },
            {
                "type":"string",
                "optional":false,
                "field":"Body"
            }
          ],
          "optional":false,
          "version":1
      },
      "payload":{
          "ApproximateFirstReceiveTimestamp":1563430750668,
          "ApproximateReceiveCount":2,
          "SenderId":"AIDA5WEKBZWN3QYIY7KAJ",
          "SentTimestamp":1563430591780,
          "MessageDeduplicationId":null,
          "MessageGroupId":null,
          "SequenceNumber":null,
          "Body":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM."
      }
    }
    

SQS credentials

The following sections provide information about configuration SQS credentials.

Export the AWS region

Before configuring credentials, you must export the AWS region. The environment variable must be exported where the Connect worker is running and where the connector is deployed.

export AWS_REGION=<your-aws-region>

Credentials providers

The credentials provided must have permission for the actions sqs:ReceiveMessage and sqs:DeleteMessage. By default, the SQS source connector looks for SQS credentials in the following locations and in the following order:

  1. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.

    export AWS_ACCESS_KEY_ID=<your_access_key_id>
    export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
    

    The AWS_ACCESS_KEY and AWS_SECRET_KEY can be used instead, but are not recognized by the AWS CLI.

  2. The aws.accessKeyId and aws.secretKey Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.

  3. The ~/.aws/credentials file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the following format:

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the SQS connector will not be able to find the credentials.

    For more details, see AWS Credentials File Format.

  4. A query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.

  5. A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.

Choose one of the above to define the AWS credentials that the SQS connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.

Note that Confluent recommends using either environment variables or a credentials file because they are the most straightforward and can be checked using the AWS CLI tool before running the connector.

Custom Credentials Provider

All SQS connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, you can create a custom credentials provider.

A credentials provider is a Java class that implements the com.amazon.auth.AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials from the environment. By default the SQS connector configuration property sqs.credentials.provider.class uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class, which looks for credentials in 5 different places as mentioned in the previous section.

You can use a different credentials provider. To do this, set the sqs.credentials.provider.class property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider interface.

Complete the following steps to use a different credentials provider:

  1. Find or create a Java credentials provider class that implements the com.amazon.auth.AWSCredentialsProvider interface
  2. Put the class file in a JAR file
  3. Place the JAR file in the share/java/kafka-connect-sqs directory on all Connect workers.
  4. Restart the Connect workers.
  5. Change the SQS connector property file to use your custom credentials. Add the provider class entry sqs.credentials.provider.class=<className> in the SQS connector properties file. Note that you must use the fully qualified class name

Record schema

Then source connector creates records in the following format:

Key schema

The Key is a struct with the following fields:

Field Name Schema Type Optional? Description
QueueUrl String No The fully qualified SQS queue URL from which this record was generated.
MessageId String No The unique message ID of the message within SQS.
MessageGroupId String Yes In case of FIFO queues, the message group ID.

Value schema

The Value is a struct with the following fields:

Field Name Schema Type Optional? Description
Body String   The body of the message.
ApproximateFirstReceiveTimestamp int64   The timestamp when this message was first received by SQS.
ApproximateReceiveCount int32   The number of times this message was received by SQS.
SenderId String   The IAM user or role that sent this message to SQS.
SentTimestamp int64   The timestamp at which the sender sent it to SQS. SentTimestamp <= ApproximateFirstReceiveTimestamp.
MessageDeduplicationId String Yes For FIFO queues, a unique ID that can be used to eliminate duplicate messages.
MessageGroupId String Yes For FIFO queues, the message group that this message belongs to.
SequenceNumber String   A globally unique sequence number generated by SQS.

Header schema

Each message attribute in SQS is converted to a Header in Kafka.

  1. The key of the header is the key of the message attribute.
  2. The value of the header is the value of the message attribute.
  3. The schema of the header depends on the data type of the message attribute.
    1. String message attributes have string schema.
    2. Number message attributes also have string schema.
    3. Binary message attributes have bytes schema.
    4. Custom message attributes will either be string or bytes, depending on the type of custom attribute.