Amazon SQS Source Connector for Confluent Platform

The Kafka Connect Simple Queue Service (SQS) Source Connector is used to move messages from AWS SQS Queues into Apache Kafka®. It supports both Standard and FIFO queues.

This connector polls an SQS queue, converts SQS messages into Kafka records, and pushes the records into a Kafka topic.

Each SQS message is converted into exactly one Kafka record, with the following structure:

  1. The key encodes the SQS queue name and message ID in a struct. For FIFO queues, it also includes the message group ID.
  2. The value encodes the body of the SQS message and various message attributes in a struct. . See Record Schema section.
  3. Each header encodes the message attributes that may be present in SQS message.

The schema for key, value and headers is described in the section Record Schema.

For standard queues, this connector supports best-effort ordering guarantees. This means that there is a chance records will end up in a different order in Kafka. Also, the connector supports at least once delivery guarantees. This means there is a chance that the connector can introduce duplicate records in Kafka.

For FIFO queues, this connector guarantees records are inserted into Kafka in the order they were inserted in SQS, as long as the destination Kafka topic has exactly 1 partition. If the destination topic has more than 1 partitions, you can write a single message transform to set the partition based on the MessageGroupId field in the key.

Features

The SQS Source connector offers the following features:

  • Atleast Once Delivery: The connector guarantees that messages from SQS are delivered atleast once to the Kafka topic.

  • Automatic Retries The SQS source connector may experience problems connecting to the SQS endpoint due to network issues or in the rare occasion due to an AWS outage. The connector will automatically retry polling SQS. The property sqs.max.retries controls how many times retries will be attempted.

    The connector internally uses AWS Java SDK. The AWS Java SDK uses an exponential backoff strategy, such that there is an increasing delay between subsequent retries. For more information, refer to Error Retries and Exponential Backoff in |aws|.

  • Supports HTTPS Proxy The connector can connect to SQS using an https proxy server. To configure the proxy, you can set sqs.proxy.url, sqs.proxy.user and sqs.proxy.password in the configuration file.

  • Long Polling This connector uses long polling to retrieve messages from SQS. By default, it will wait up to 20 seconds for a message to be available in SQS. You can change sqs.waittime.seconds to a number between 1s and 20s.

  • Fetch Multiple Messages In every poll cycle, the connector fetches sqs.messages.max number of messages. By default, this value is 10, which is the maximum. This setting works well for most use cases. However, if your message size is exceptionally large, you may want to reduce this to a lower number.

Install the SQS Source Connector

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-sqs:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-sqs:6.0.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Amazon SQS Source Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Quick Start

In this quick start guide, you use the SQS source connector to export messages from an SQS FIFO queue to a Kafka topic.

Prerequisites

  • Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI, but standalone installations are also supported. By default ZooKeeper, Kafka, Schema Registry, Connect, and the Connect REST API are started with the confluent local services start command. For more information, see Quick Start for Apache Kafka using Confluent Platform (Local).
  • You must install the aws cli tools and configure it by running the aws configure command.
  • Make sure that the IAM user or role you configure has full access to SQS.
  • Create a Kafka topic called sqs-quickstart

Create a FIFO queue by running the following command:

aws sqs create-queue --queue-name sqs-source-connector-demo

You should see output similar to the following:

{
  "QueueUrl": "https://queue.amazonaws.com/940887362971/sqs-source-connector-demo"
}

Next, add some records to this newly created queue. Create a file called send-message-batch.json with the following content:

[
  {
      "Id":"FuelReport-0001-2015-09-16T140731Z",
      "MessageBody":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM.",
      "DelaySeconds":10,
      "MessageAttributes":{
        "SellerName":{
            "DataType":"String",
            "StringValue":"Example Store"
        },
        "City":{
            "DataType":"String",
            "StringValue":"Any City"
        },
        "Region":{
            "DataType":"String",
            "StringValue":"WA"
        },
        "PostalCode":{
            "DataType":"String",
            "StringValue":"99065"
        },
        "PricePerGallon":{
            "DataType":"Number",
            "StringValue":"1.99"
        }
      }
  },
  {
      "Id":"FuelReport-0002-2015-09-16T140930Z",
      "MessageBody":"Fuel report for account 0002 on 2015-09-16 at 02:09:30 PM.",
      "DelaySeconds":10,
      "MessageAttributes":{
        "SellerName":{
            "DataType":"String",
            "StringValue":"Example Fuels"
        },
        "City":{
            "DataType":"String",
            "StringValue":"North Town"
        },
        "Region":{
            "DataType":"String",
            "StringValue":"WA"
        },
        "PostalCode":{
            "DataType":"String",
            "StringValue":"99123"
        },
        "PricePerGallon":{
            "DataType":"Number",
            "StringValue":"1.87"
        }
      }
  }
]

Then run the following command:

aws sqs send-message-batch --queue-url https://queue.amazonaws.com/940887362971/sqs-source-connector-demo --entries file://send-message-batch.json``

Next, load the SQS Source Connector:

Tip

Before starting the connector, verify that the configuration parameters in etc/kafka-connect-sqs/quickstart-sqs.properties are properly set. For example, make sure that sqs.url points to the correct SQS URL. The sqs.url parameter format is: sqs.url=https://sqs.<region-code>.amazonaws.com/<account_no>/<topic-name.fifo>. For example, if the AWS CLI returns the queue URL: https://eu-central-1.queue.amazonaws.com/829250931565/sqs-source-connector-demo, the sqs.url for the SQS source connector is https://sqs.eu-central-1.amazonaws.com/829250931565/sqs-source-connector-demo.

Important

Please make sure that the sqs.url configuration parameter points to the correct SQS URL.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent local services connect connector load sqs-source

Your output should resemble:

{
  "name": "sqs-source",
  "config": {
    "connector.class": "io.confluent.connect.sqs.source.SqsSourceConnector",
    "tasks.max": "1",
    "kafka.topic": "test-sqs-source",
    "sqs.url": "https://sqs.us-east-1.amazonaws.com/942288736285822/sqs-fifo-queue.fifo",
    "name": "sqs-source"
  },
  "tasks": [],
  "type": null
}

After the connector finishes ingesting data to Kafka, check that the data is available in the Kafka topic.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-sqs-source --from-beginning

You should see 2 records, similar to the following:

{
  "schema":{
      "type":"struct",
      "fields":[
        {
            "type":"int64",
            "optional":false,
            "field":"ApproximateFirstReceiveTimestamp"
        },
        {
            "type":"int32",
            "optional":false,
            "field":"ApproximateReceiveCount"
        },
        {
            "type":"string",
            "optional":false,
            "field":"SenderId"
        },
        {
            "type":"int64",
            "optional":false,
            "field":"SentTimestamp"
        },
        {
            "type":"string",
            "optional":true,
            "field":"MessageDeduplicationId"
        },
        {
            "type":"string",
            "optional":true,
            "field":"MessageGroupId"
        },
        {
            "type":"string",
            "optional":true,
            "field":"SequenceNumber"
        },
        {
            "type":"string",
            "optional":false,
            "field":"Body"
        }
      ],
      "optional":false,
      "version":1
  },
  "payload":{
      "ApproximateFirstReceiveTimestamp":1563430750668,
      "ApproximateReceiveCount":2,
      "SenderId":"AIDA5WEKBZWN3QYIY7KAJ",
      "SentTimestamp":1563430591780,
      "MessageDeduplicationId":null,
      "MessageGroupId":null,
      "SequenceNumber":null,
      "Body":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM."
  }
}

SQS Credentials

The following sections provide information about configuration SQS credentials.

Export the AWS region

Before configuring credentials, you must export the AWS region. The environment variable must be exported where the Connect worker is running and where the connector is deployed.

export AWS_REGION=<your-aws-region>

Credentials providers

By default, the SQS source connector looks for SQS credentials in the following locations and in the following order:

  1. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.

    export AWS_ACCESS_KEY_ID=<your_access_key_id>
    export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
    

    The AWS_ACCESS_KEY and AWS_SECRET_KEY can be used instead, but are not recognized by the AWS CLI.

  2. The aws.accessKeyId and aws.secretKey Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.

  3. The ~/.aws/credentials file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the following format:

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the SQS connector will not be able to find the credentials.

    See AWS Credentials File Format for additional details.

  4. A query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.

  5. A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.

Choose one of the above to define the AWS credentials that the SQS connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.

Note

Confluent recommends using either Environment variables or a Credentials file because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.

Using Trusted Account Credentials

This connector can assume a role and use credentials from a separate trusted account. This is a default feature provided with recent versions of this connector that include an updated version of the AWS SDK.

After you create the trust relationship, an IAM user or an application from the trusted account can use the AWS Security Token Service (AWS STS) AssumeRole API operation. This operation provides temporary security credentials that enable access to AWS resources for the connector. For details, see Creating a Role to Delegate Permissions to an IAM User.

Example:
Profile in ~/.aws/credentials:

[default]
role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role
source_profile=staging
role_session_name = OPTIONAL_SESSION_NAME

[staging]
aws_access_key_id = <STAGING KEY>
aws_secret_access_key = <STAGING SECRET>

To allow the connector to assume a role with the right permissions, set the Amazon Resource Name (ARN) for this role. Additionally, you must choose between source_profile or credential_source as the way to get credentials that have permission to assume the role, in the environment where the connector is running.

Note

When setting up trusted account credentials, be aware that the approach of loading profiles from both ~/.aws/credentials and ~/.aws/config does not work when configuring this connector. Assumed role settings and credentials must be placed in the ~/.aws/credentials file.

Custom Credentials Provider

All SQS connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, you can create a custom credentials provider.

A credentials provider is a Java class that implements the com.amazon.auth.AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials from the environment. By default the SQS connector configuration property sqs.credentials.provider.class uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class, which looks for credentials in 5 different places as mentioned in the previous section.

You can use a different credentials provider. To do this, set the sqs.credentials.provider.class property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider interface.

Complete the following steps to use a different credentials provider:

  1. Find or create a Java credentials provider class that implements the com.amazon.auth.AWSCredentialsProvider interface
  2. Put the class file in a JAR file
  3. Place the JAR file in the share/java/kafka-connect-sqs directory on all Connect workers.
  4. Restart the Connect workers.
  5. Change the SQS connector property file to use your custom credentials. Add the provider class entry sqs.credentials.provider.class=<className> in the SQS connector properties file. Note that you must use the fully qualified class name

Record Schema

Then source connector creates records in the following format:

Key Schema

The Key is a struct with the following fields:

Field Name Schema Type Optional? Description
QueueUrl string mandatory the fully qualified SQS queue url from which this record was generated
MessageId string mandatory the unique message ID of the message within SQS
MessageGroupId string optional in case of FIFO queues, the message group ID

Value Schema

The Value is a struct with the following fields:

Field Name Schema Type Optional? Description
Body string   the body of the SQS message.
ApproximateFirstReceiveTimestamp int64   the timestamp when this message was first received by SQS.
ApproximateReceiveCount int32   the number of times this message was received by SQS.
SenderId string   the IAM user or role that sent this message to SQS.
SentTimestamp int64   the timestamp at which the sender sent it to SQS. SentTimestamp <= ApproximateFirstReceiveTimestamp.
MessageDeduplicationId string Optional For FIFO queues, a unique ID that can be used to eliminate duplicate messages.
MessageGroupId string Optional For FIFO queues, the message group that this message belongs to.
SequenceNumber string   A globally unique sequence number generated by SQS.

Header Schema

Each message attribute in SQS is converted to a Header in Kafka.

  1. The key of the header is the key of the message attribute.
  2. The value of the header is the value of the message attribute.
  3. The schema of the header depends on the data type of the message attribute.
    1. String message attributes have string schema.
    2. Number message attributes also have string schema.
    3. Binary message attributes have bytes schema.
    4. Custom message attributes will either be string or bytes, depending on the type of custom attribute.