Amazon SQS Source Connector for Confluent Platform¶
The Kafka Connect Simple Queue Service (SQS) Source connector is used to move messages from AWS SQS Queues into Apache Kafka®. It supports both Standard and FIFO queues.
This connector polls an SQS queue, converts SQS messages into Kafka records, and pushes the records into a Kafka topic.
Each SQS message is converted into exactly one Kafka record, with the following structure:
- The key encodes the SQS queue name and message ID in a struct. For FIFO queues, it also includes the message group ID.
- The value encodes the body of the SQS message and various message attributes in a struct. See the Record schema section.
- Each header encodes the message attributes that may be present in SQS message.
The schema for key, value and headers is described in the Record schema section.
For standard queues, this connector supports best-effort ordering guarantees. This means that there is a chance records will end up in a different order in Kafka. Also, the connector supports at least once delivery guarantees. This means there is a chance that the connector can introduce duplicate records in Kafka.
For FIFO queues, this connector guarantees records are inserted into Kafka in the
order they were inserted in SQS, as long as the destination Kafka topic has
exactly 1 partition. If the destination topic has more than 1 partitions, you
can write a single message transform to set the partition based on the
MessageGroupId
field in the key.
The SQS Source connector sends delete requests once the SQS messages are processed and stored in the Kafka topics.
Features¶
The SQS Source connector includes the following features:
- At least once delivery
- Multiple tasks
- Automatic retries
- Supports HTTPS proxy
- Long polling
- Fetch multiple messages
At least once delivery¶
The connector guarantees that messages from SQS are delivered at least once to the Kafka topic.
Multiple tasks¶
The SQS Source connector supports running one or more tasks. You can specify the
number of tasks in the tasks.max
configuration parameter. This can lead to
performance gains when multiple files need to be parsed.
Automatic retries¶
The SQS Source connector may experience problems connecting to the SQS endpoint
due to network issues or in the rare occasion due to an AWS outage. The
connector will automatically retry polling SQS. The property sqs.max.retries
controls how many times retries will be attempted.
The connector internally uses AWS Java SDK. The AWS Java SDK uses an exponential backoff strategy, such that there is an increasing delay between subsequent retries. For more information, refer to Error Retries and Exponential Backoff in AWS.
Supports HTTPS proxy¶
The connector can connect to SQS using an https proxy server. To configure the
proxy, you can set sqs.proxy.url
, sqs.proxy.user
and
sqs.proxy.password
in the configuration file.
Long polling¶
This connector uses long polling to retrieve messages
from SQS. By default, it will wait up to 20 seconds for a message to be
available in SQS. You can change sqs.waittime.seconds
to a number between 1s
and 20s.
Fetch multiple messages¶
In every poll cycle, the connector fetches sqs.messages.max
number of
messages. By default, this value is 10, which is the maximum. This setting works
well for most use cases. However, if your message size is exceptionally large,
you may want to reduce this to a lower number.
Install the SQS Source connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- Java 8+. Note that Java 8 is deprecated in versions 7.2 and later of Confluent Platform. For more details, view Java compatibility with Confluent Platform by version.
- An installation of the Confluent Hub Client. Note that this is installed by default with Confluent Enterprise.
- An installation of the latest (
latest
) connector version.
Install the connector using the Confluent CLI¶
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-sqs:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-sqs:2.1.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, refer to License topic configuration.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Amazon SQS Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick start¶
In this quick start guide, you use the SQS Source connector to export messages from an SQS FIFO queue to a Kafka topic. Before running the quick start, ensure the following:
- Confluent Platform is
installed and services are running by using the Confluent
CLI. This quick start assumes that you are using the
Confluent CLI, but standalone installations are also supported. By default
ZooKeeper, Kafka, Schema Registry, Connect, and the Connect REST API are started with the
confluent local services start
command. For more information, see Quick Start for Apache Kafka using Confluent Platform (Local). Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. - You must install the AWS CLI and
configure it by running the
aws configure
command. - Ensure the IAM user or role you configure has full access to SQS.
- Create a Kafka topic called
sqs-quickstart
.
Create a FIFO queue by running the following command:
aws sqs create-queue --queue-name sqs-source-connector-demo
You should see output similar to the following:
{ "QueueUrl": "https://queue.amazonaws.com/940887362971/sqs-source-connector-demo" }
Add some records to the newly created queue by first creating a file called
send-message-batch.json
with the following content:[ { "Id":"FuelReport-0001-2015-09-16T140731Z", "MessageBody":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM.", "DelaySeconds":10, "MessageAttributes":{ "SellerName":{ "DataType":"String", "StringValue":"Example Store" }, "City":{ "DataType":"String", "StringValue":"Any City" }, "Region":{ "DataType":"String", "StringValue":"WA" }, "PostalCode":{ "DataType":"String", "StringValue":"99065" }, "PricePerGallon":{ "DataType":"Number", "StringValue":"1.99" } } }, { "Id":"FuelReport-0002-2015-09-16T140930Z", "MessageBody":"Fuel report for account 0002 on 2015-09-16 at 02:09:30 PM.", "DelaySeconds":10, "MessageAttributes":{ "SellerName":{ "DataType":"String", "StringValue":"Example Fuels" }, "City":{ "DataType":"String", "StringValue":"North Town" }, "Region":{ "DataType":"String", "StringValue":"WA" }, "PostalCode":{ "DataType":"String", "StringValue":"99123" }, "PricePerGallon":{ "DataType":"Number", "StringValue":"1.87" } } } ]
Add the records to the queue by running the following command:
aws sqs send-message-batch --queue-url https://queue.amazonaws.com/940887362971/sqs-source-connector-demo --entries file://send-message-batch.json``
Load the SQS Source connector. Note that you must ensure the
sqs.url
configuration parameter points to the correct SQS URL. Thesqs.url
parameter format is:sqs.url=https://sqs.<region-code>.amazonaws.com/<account_no>/<topic-name.fifo>
.For example, if the AWS CLI returns the queue URL:
https://eu-central-1.queue.amazonaws.com/829250931565/sqs-source-connector-demo
, thesqs.url
for the SQS Source connector ishttps://sqs.eu-central-1.amazonaws.com/829250931565/sqs-source-connector-demo
.confluent local services connect connector load sqs-source
Your output should resemble:
{ "name": "sqs-source", "config": { "connector.class": "io.confluent.connect.sqs.source.SqsSourceConnector", "tasks.max": "1", "kafka.topic": "test-sqs-source", "sqs.url": "https://sqs.us-east-1.amazonaws.com/942288736285822/sqs-fifo-queue.fifo", "name": "sqs-source" }, "tasks": [], "type": null }
After the connector finishes ingesting data to Kafka, check that the data is available in the Kafka topic.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-sqs-source --from-beginning
You should see two records, similar to the following:
{ "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "field":"ApproximateFirstReceiveTimestamp" }, { "type":"int32", "optional":false, "field":"ApproximateReceiveCount" }, { "type":"string", "optional":false, "field":"SenderId" }, { "type":"int64", "optional":false, "field":"SentTimestamp" }, { "type":"string", "optional":true, "field":"MessageDeduplicationId" }, { "type":"string", "optional":true, "field":"MessageGroupId" }, { "type":"string", "optional":true, "field":"SequenceNumber" }, { "type":"string", "optional":false, "field":"Body" } ], "optional":false, "version":1 }, "payload":{ "ApproximateFirstReceiveTimestamp":1563430750668, "ApproximateReceiveCount":2, "SenderId":"AIDA5WEKBZWN3QYIY7KAJ", "SentTimestamp":1563430591780, "MessageDeduplicationId":null, "MessageGroupId":null, "SequenceNumber":null, "Body":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM." } }
SQS credentials¶
The following sections provide information about configuration SQS credentials.
Export the AWS region¶
Before configuring credentials, you must export the AWS region. The environment variable must be exported where the Connect worker is running and where the connector is deployed.
export AWS_REGION=<your-aws-region>
Credentials providers¶
The credentials provided must have permission for the actions sqs:ReceiveMessage
and sqs:DeleteMessage
.
By default, the SQS source connector looks for SQS credentials in the following
locations and in the following order:
The
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.export AWS_ACCESS_KEY_ID=<your_access_key_id> export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
The
AWS_ACCESS_KEY
andAWS_SECRET_KEY
can be used instead, but are not recognized by the AWS CLI.The
aws.accessKeyId
andaws.secretKey
Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.The
~/.aws/credentials
file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:aws configure
You can also manually create the credentials file using a text editor. The file should contain lines in the following format:
[default] aws_access_key_id = <your_access_key_id> aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the SQS connector will not be able to find the credentials.
For more details, see AWS Credentials File Format.
A query sent to
http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI}
to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.
Choose one of the above to define the AWS credentials that the SQS connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.
Note that Confluent recommends using either environment variables or a credentials file because they are the most straightforward and can be checked using the AWS CLI tool before running the connector.
Custom Credentials Provider¶
All SQS connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, you can create a custom credentials provider.
A credentials provider is a Java class that implements the
com.amazon.auth.AWSCredentialsProvider
interface in the AWS Java library and returns AWS credentials from the
environment. By default the SQS connector configuration property
sqs.credentials.provider.class
uses the
com.amazon.auth.DefaultAWSCredentialsProviderChain
class, which looks for
credentials in 5 different places as mentioned in the previous section.
You can use a different credentials provider. To do this, set the
sqs.credentials.provider.class
property to the name of any class that
implements the com.amazon.auth.AWSCredentialsProvider
interface.
Complete the following steps to use a different credentials provider:
- Find or create a Java credentials provider class that implements the
com.amazon.auth.AWSCredentialsProvider
interface - Put the class file in a JAR file
- Place the JAR file in the
share/java/kafka-connect-sqs
directory on all Connect workers. - Restart the Connect workers.
- Change the SQS connector property file to use your custom credentials.
Add the provider class entry
sqs.credentials.provider.class=<className>
in the SQS connector properties file. Note that you must use the fully qualified class name
Record schema¶
Then source connector creates records in the following format:
Key schema¶
The Key is a struct
with the following fields:
Field Name | Schema Type | Optional? | Description |
---|---|---|---|
QueueUrl | String | No | The fully qualified SQS queue URL from which this record was generated. |
MessageId | String | No | The unique message ID of the message within SQS. |
MessageGroupId | String | Yes | In case of FIFO queues, the message group ID. |
Value schema¶
The Value is a struct
with the following fields:
Field Name | Schema Type | Optional? | Description |
---|---|---|---|
Body | String | The body of the message. | |
ApproximateFirstReceiveTimestamp | int64 | The timestamp when this message was first received by SQS. | |
ApproximateReceiveCount | int32 | The number of times this message was received by SQS. | |
SenderId | String | The IAM user or role that sent this message to SQS. | |
SentTimestamp | int64 | The timestamp at which the sender sent it to SQS. SentTimestamp <= ApproximateFirstReceiveTimestamp . |
|
MessageDeduplicationId | String | Yes | For FIFO queues, a unique ID that can be used to eliminate duplicate messages. |
MessageGroupId | String | Yes | For FIFO queues, the message group that this message belongs to. |
SequenceNumber | String | A globally unique sequence number generated by SQS. |
Header schema¶
Each message attribute in SQS is converted to a Header in Kafka.
- The key of the header is the key of the message attribute.
- The value of the header is the value of the message attribute.
- The schema of the header depends on the data type of the message attribute.
- String message attributes have string schema.
- Number message attributes also have string schema.
- Binary message attributes have bytes schema.
- Custom message attributes will either be string or bytes, depending on the type of custom attribute.