Amazon SQS Source Connector for Confluent Platform¶
The Kafka Connect Simple Queue Service (SQS) Source connector is used to move messages from AWS SQS Queues into Apache Kafka®. It supports both Standard and FIFO queues.
This connector polls an SQS queue, converts SQS messages into Kafka records, and pushes the records into a Kafka topic.
Each SQS message is converted into exactly one Kafka record, with the following structure:
- The key encodes the SQS queue name and message ID in a struct. For FIFO queues, it also includes the message group ID.
- The value encodes the body of the SQS message and various message attributes in a struct. See the Record Schema section.
- Each header encodes the message attributes that may be present in SQS message.
The schema for key, value and headers is described in the Record Schema section.
For standard queues, this connector supports best-effort ordering guarantees. This means that there is a chance records will end up in a different order in Kafka. Also, the connector supports at least once delivery guarantees. This means there is a chance that the connector can introduce duplicate records in Kafka.
For FIFO queues, this connector guarantees records are inserted into Kafka in the
order they were inserted in SQS, as long as the destination Kafka topic has
exactly 1 partition. If the destination topic has more than 1 partitions, you
can write a single message transform to set the partition based on the
MessageGroupId
field in the key.
The SQS Source connector sends delete requests once the SQS messages are processed and stored in the Kafka topics.
Features¶
The SQS Source connector includes the following features:
- At least once delivery
- Multiple tasks
- Automatic retries
- Supports HTTPS proxy
- Long polling
- Fetch multiple messages
At least once delivery¶
The connector guarantees that messages from SQS are delivered at least once to the Kafka topic.
Multiple tasks¶
The SQS Source connector supports running one or more tasks. You can specify the
number of tasks in the tasks.max
configuration parameter. This can lead to
performance gains when multiple files need to be parsed.
Automatic retries¶
The SQS Source connector may experience problems connecting to the SQS endpoint
due to network issues or in the rare occasion due to an AWS outage. The
connector will automatically retry polling SQS. The property sqs.max.retries
controls how many times retries will be attempted.
The connector internally uses AWS Java SDK. The AWS Java SDK uses an exponential backoff strategy, such that there is an increasing delay between subsequent retries. For more information, refer to Error Retries and Exponential Backoff in AWS.
Supports HTTPS proxy¶
The connector can connect to SQS using an https proxy server. To configure the
proxy, you can set sqs.proxy.url
, sqs.proxy.user
and
sqs.proxy.password
in the configuration file.
Long polling¶
This connector uses long polling to retrieve messages
from SQS. By default, it will wait up to 20 seconds for a message to be
available in SQS. You can change sqs.waittime.seconds
to a number between 1s
and 20s.
Fetch multiple messages¶
In every poll cycle, the connector fetches sqs.messages.max
number of
messages. By default, this value is 10, which is the maximum. This setting works
well for most use cases. However, if your message size is exceptionally large,
you may want to reduce this to a lower number.
Install the SQS Source Connector¶
You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent-hub install confluentinc/kafka-connect-sqs:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent-hub install confluentinc/kafka-connect-sqs:1.2.4
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Amazon SQS Source Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.
Quick Start¶
In this quick start guide, you use the SQS source connector to export messages from an SQS FIFO queue to a Kafka topic.
Prerequisites¶
- Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI, but standalone installations are also supported. By default ZooKeeper, Kafka, Schema Registry, Connect, and the Connect REST API are started with the
confluent local services start
command. For more information, see Quick Start for Apache Kafka using Confluent Platform (Local). - You must install the aws cli tools and configure it by running the
aws configure
command. - Make sure that the IAM user or role you configure has full access to SQS.
- Create a Kafka topic called
sqs-quickstart
Create a FIFO queue by running the following command:
aws sqs create-queue --queue-name sqs-source-connector-demo
You should see output similar to the following:
{
"QueueUrl": "https://queue.amazonaws.com/940887362971/sqs-source-connector-demo"
}
Next, add some records to this newly created queue. Create a file called send-message-batch.json
with the following content:
[
{
"Id":"FuelReport-0001-2015-09-16T140731Z",
"MessageBody":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM.",
"DelaySeconds":10,
"MessageAttributes":{
"SellerName":{
"DataType":"String",
"StringValue":"Example Store"
},
"City":{
"DataType":"String",
"StringValue":"Any City"
},
"Region":{
"DataType":"String",
"StringValue":"WA"
},
"PostalCode":{
"DataType":"String",
"StringValue":"99065"
},
"PricePerGallon":{
"DataType":"Number",
"StringValue":"1.99"
}
}
},
{
"Id":"FuelReport-0002-2015-09-16T140930Z",
"MessageBody":"Fuel report for account 0002 on 2015-09-16 at 02:09:30 PM.",
"DelaySeconds":10,
"MessageAttributes":{
"SellerName":{
"DataType":"String",
"StringValue":"Example Fuels"
},
"City":{
"DataType":"String",
"StringValue":"North Town"
},
"Region":{
"DataType":"String",
"StringValue":"WA"
},
"PostalCode":{
"DataType":"String",
"StringValue":"99123"
},
"PricePerGallon":{
"DataType":"Number",
"StringValue":"1.87"
}
}
}
]
Then run the following command:
aws sqs send-message-batch --queue-url https://queue.amazonaws.com/940887362971/sqs-source-connector-demo --entries file://send-message-batch.json``
Next, load the SQS Source connector:
Tip
Before starting the connector, verify that the configuration parameters in etc/kafka-connect-sqs/quickstart-sqs.properties
are properly set. For example, make sure that sqs.url
points to the correct SQS URL. The sqs.url
parameter format is: sqs.url=https://sqs.<region-code>.amazonaws.com/<account_no>/<topic-name.fifo>
. For example, if the AWS CLI returns the queue URL: https://eu-central-1.queue.amazonaws.com/829250931565/sqs-source-connector-demo
, the sqs.url
for the SQS source connector is https://sqs.eu-central-1.amazonaws.com/829250931565/sqs-source-connector-demo
.
Important
Please make sure that the sqs.url
configuration parameter points to the correct SQS URL.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services connect connector load sqs-source
Your output should resemble:
{
"name": "sqs-source",
"config": {
"connector.class": "io.confluent.connect.sqs.source.SqsSourceConnector",
"tasks.max": "1",
"kafka.topic": "test-sqs-source",
"sqs.url": "https://sqs.us-east-1.amazonaws.com/942288736285822/sqs-fifo-queue.fifo",
"name": "sqs-source"
},
"tasks": [],
"type": null
}
After the connector finishes ingesting data to Kafka, check that the data is available in the Kafka topic.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-sqs-source --from-beginning
You should see 2 records, similar to the following:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"ApproximateFirstReceiveTimestamp"
},
{
"type":"int32",
"optional":false,
"field":"ApproximateReceiveCount"
},
{
"type":"string",
"optional":false,
"field":"SenderId"
},
{
"type":"int64",
"optional":false,
"field":"SentTimestamp"
},
{
"type":"string",
"optional":true,
"field":"MessageDeduplicationId"
},
{
"type":"string",
"optional":true,
"field":"MessageGroupId"
},
{
"type":"string",
"optional":true,
"field":"SequenceNumber"
},
{
"type":"string",
"optional":false,
"field":"Body"
}
],
"optional":false,
"version":1
},
"payload":{
"ApproximateFirstReceiveTimestamp":1563430750668,
"ApproximateReceiveCount":2,
"SenderId":"AIDA5WEKBZWN3QYIY7KAJ",
"SentTimestamp":1563430591780,
"MessageDeduplicationId":null,
"MessageGroupId":null,
"SequenceNumber":null,
"Body":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM."
}
}
SQS Credentials¶
The following sections provide information about configuration SQS credentials.
Export the AWS region¶
Before configuring credentials, you must export the AWS region. The environment variable must be exported where the Connect worker is running and where the connector is deployed.
export AWS_REGION=<your-aws-region>
Credentials providers¶
The credentials provided must have permission for the actions sqs:ReceiveMessage
and sqs:DeleteMessage
.
By default, the SQS source connector looks for SQS credentials in the following
locations and in the following order:
The
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.export AWS_ACCESS_KEY_ID=<your_access_key_id> export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
The
AWS_ACCESS_KEY
andAWS_SECRET_KEY
can be used instead, but are not recognized by the AWS CLI.The
aws.accessKeyId
andaws.secretKey
Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.The
~/.aws/credentials
file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:aws configure
You can also manually create the credentials file using a text editor. The file should contain lines in the following format:
[default] aws_access_key_id = <your_access_key_id> aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the SQS connector will not be able to find the credentials.
See AWS Credentials File Format for additional details.
A query sent to
http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI}
to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.
Choose one of the above to define the AWS credentials that the SQS connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.
Note
Confluent recommends using either Environment variables or a Credentials file because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.
Custom Credentials Provider¶
All SQS connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, you can create a custom credentials provider.
A credentials provider is a Java class that implements the
com.amazon.auth.AWSCredentialsProvider
interface in the AWS Java library and returns AWS credentials from the
environment. By default the SQS connector configuration property
sqs.credentials.provider.class
uses the
com.amazon.auth.DefaultAWSCredentialsProviderChain
class, which looks for
credentials in 5 different places as mentioned in the previous section.
You can use a different credentials provider. To do this, set the
sqs.credentials.provider.class
property to the name of any class that
implements the com.amazon.auth.AWSCredentialsProvider
interface.
Complete the following steps to use a different credentials provider:
- Find or create a Java credentials provider class that implements the
com.amazon.auth.AWSCredentialsProvider
interface - Put the class file in a JAR file
- Place the JAR file in the
share/java/kafka-connect-sqs
directory on all Connect workers. - Restart the Connect workers.
- Change the SQS connector property file to use your custom credentials.
Add the provider class entry
sqs.credentials.provider.class=<className>
in the SQS connector properties file. Note that you must use the fully qualified class name
Record Schema¶
Then source connector creates records in the following format:
Key Schema¶
The Key is a struct
with the following fields:
Field Name | Schema Type | Optional? | Description |
---|---|---|---|
QueueUrl | String | No | The fully qualified SQS queue URL from which this record was generated. |
MessageId | String | No | The unique message ID of the message within SQS. |
MessageGroupId | String | Yes | In case of FIFO queues, the message group ID. |
Value Schema¶
The Value is a struct
with the following fields:
Field Name | Schema Type | Optional? | Description |
---|---|---|---|
Body | String | The body of the message. | |
ApproximateFirstReceiveTimestamp | int64 | The timestamp when this message was first received by SQS. | |
ApproximateReceiveCount | int32 | The number of times this message was received by SQS. | |
SenderId | String | The IAM user or role that sent this message to SQS. | |
SentTimestamp | int64 | The timestamp at which the sender sent it to SQS. SentTimestamp <= ApproximateFirstReceiveTimestamp . |
|
MessageDeduplicationId | String | Yes | For FIFO queues, a unique ID that can be used to eliminate duplicate messages. |
MessageGroupId | String | Yes | For FIFO queues, the message group that this message belongs to. |
SequenceNumber | String | A globally unique sequence number generated by SQS. |
Header Schema¶
Each message attribute in SQS is converted to a Header in Kafka.
- The key of the header is the key of the message attribute.
- The value of the header is the value of the message attribute.
- The schema of the header depends on the data type of the message attribute.
- String message attributes have string schema.
- Number message attributes also have string schema.
- Binary message attributes have bytes schema.
- Custom message attributes will either be string or bytes, depending on the type of custom attribute.