Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Amazon SQS Source Connector for Confluent Platform¶
The Kafka Connect Simple Queue Service (SQS) Source Connector is used to move messages from AWS SQS Queues into Apache Kafka®. It supports both Standard and FIFO queues.
This connector polls an SQS queue, converts SQS messages into Kafka records, and pushes the records into a Kafka topic.
Each SQS message is converted into exactly one Kafka record, with the following structure:
- The key encodes the SQS queue name and message ID in a struct. For FIFO queues, it also includes the message group ID.
- The value encodes the body of the SQS message and various message attributes in a struct. . See Record Schema section.
- Each header encodes the message attributes that may be present in SQS message.
The schema for key, value and headers is described in the section Record Schema.
For standard queues, this connector supports best-effort ordering guarantees. This means that there is a chance records will end up in a different order in Kafka. Also, the connector supports at least once delivery guarantees. This means there is a chance that the connector can introduce duplicate records in Kafka.
For FIFO queues, this connector guarantees records are inserted into Kafka in the order they were inserted in SQS, as long as the destination Kafka topic has exactly 1 partition. If the destination topic has more than 1 partitions, you can write a single message transform to set the partition based on the MessageGroupId field in the key.
Features¶
The SQS Source connector offers the following features:
Atleast Once Delivery: The connector guarantees that messages from SQS are delivered atleast once to the Kafka topic.
Automatic Retries The SQS source connector may experience problems connecting to the SQS endpoint due to network issues or in the rare occasion due to an AWS outage. The connector will automatically retry polling SQS. The property
sqs.max.retries
controls how many times retries will be attempted.The connector internally uses AWS Java SDK. The AWS Java SDK uses an exponential backoff strategy, such that there is an increasing delay between subsequent retries. For more information, refer to Error Retries and Exponential Backoff in |aws|.
Supports HTTPS Proxy The connector can connect to SQS using an https proxy server. To configure the proxy, you can set
sqs.proxy.url
,sqs.proxy.user
andsqs.proxy.password
in the configuration file.Long Polling This connector uses long polling to retrieve messages from SQS. By default, it will wait up to 20 seconds for a message to be available in SQS. You can change
sqs.waittime.seconds
to a number between 1s and 20s.Fetch Multiple Messages In every poll cycle, the connector fetches
sqs.messages.max
number of messages. By default, this value is 10, which is the maximum. This setting works well for most use cases. However, if your message size is exceptionally large, you may want to reduce this to a lower number.
Install the SQS Source Connector¶
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-sqs:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-sqs:5.5.15
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Amazon SQS Source Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Quick Start¶
In this quick start guide, you use the SQS source connector to export messages from an SQS FIFO queue to a Kafka topic.
Prerequisites
- Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI, but standalone installations are also supported. By default ZooKeeper, Kafka, Schema Registry, Connect, and the Connect REST API are started with the
confluent local start
command. For more information, see Quick Start for Apache Kafka using Confluent Platform (Local). - You must install the aws cli tools and configure it by running the
aws configure
command. - Make sure that the IAM user or role you configure has full access to SQS.
- Create a Kafka topic called
sqs-quickstart
Create a FIFO queue by running the following command:
aws sqs create-queue --queue-name sqs-source-connector-demo
You should see output similar to the following:
{
"QueueUrl": "https://queue.amazonaws.com/940887362971/sqs-source-connector-demo"
}
Next, add some records to this newly created queue. Create a file called send-message-batch.json
with the following content:
[
{
"Id":"FuelReport-0001-2015-09-16T140731Z",
"MessageBody":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM.",
"DelaySeconds":10,
"MessageAttributes":{
"SellerName":{
"DataType":"String",
"StringValue":"Example Store"
},
"City":{
"DataType":"String",
"StringValue":"Any City"
},
"Region":{
"DataType":"String",
"StringValue":"WA"
},
"PostalCode":{
"DataType":"String",
"StringValue":"99065"
},
"PricePerGallon":{
"DataType":"Number",
"StringValue":"1.99"
}
}
},
{
"Id":"FuelReport-0002-2015-09-16T140930Z",
"MessageBody":"Fuel report for account 0002 on 2015-09-16 at 02:09:30 PM.",
"DelaySeconds":10,
"MessageAttributes":{
"SellerName":{
"DataType":"String",
"StringValue":"Example Fuels"
},
"City":{
"DataType":"String",
"StringValue":"North Town"
},
"Region":{
"DataType":"String",
"StringValue":"WA"
},
"PostalCode":{
"DataType":"String",
"StringValue":"99123"
},
"PricePerGallon":{
"DataType":"Number",
"StringValue":"1.87"
}
}
}
]
Then run the following command:
aws sqs send-message-batch --queue-url https://queue.amazonaws.com/940887362971/sqs-source-connector-demo --entries file://send-message-batch.json``
Next, load the SQS Source Connector:
Tip
Before starting the connector, verify that the configuration parameters in etc/kafka-connect-sqs/quickstart-sqs.properties
are properly set. For example, make sure that sqs.url
points to the correct SQS URL. The sqs.url
parameter format is: sqs.url=https://sqs.<region-code>.amazonaws.com/<account_no>/<topic-name.fifo>
. For example, if the AWS CLI returns the queue URL: https://eu-central-1.queue.amazonaws.com/829250931565/sqs-source-connector-demo
, the sqs.url
for the SQS source connector is https://sqs.eu-central-1.amazonaws.com/829250931565/sqs-source-connector-demo
.
Important
Please make sure that the sqs.url
configuration parameter points to the correct SQS URL.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local start
. For more information, see confluent local.
confluent local load sqs-source
Your output should resemble:
{
"name": "sqs-source",
"config": {
"connector.class": "io.confluent.connect.sqs.source.SqsSourceConnector",
"tasks.max": "1",
"kafka.topic": "test-sqs-source",
"sqs.url": "https://sqs.us-east-1.amazonaws.com/942288736285822/sqs-fifo-queue.fifo",
"name": "sqs-source"
},
"tasks": [],
"type": null
}
After the connector finishes ingesting data to Kafka, check that the data is available in the Kafka topic.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-sqs-source --from-beginning
You should see 2 records, similar to the following:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"ApproximateFirstReceiveTimestamp"
},
{
"type":"int32",
"optional":false,
"field":"ApproximateReceiveCount"
},
{
"type":"string",
"optional":false,
"field":"SenderId"
},
{
"type":"int64",
"optional":false,
"field":"SentTimestamp"
},
{
"type":"string",
"optional":true,
"field":"MessageDeduplicationId"
},
{
"type":"string",
"optional":true,
"field":"MessageGroupId"
},
{
"type":"string",
"optional":true,
"field":"SequenceNumber"
},
{
"type":"string",
"optional":false,
"field":"Body"
}
],
"optional":false,
"version":1
},
"payload":{
"ApproximateFirstReceiveTimestamp":1563430750668,
"ApproximateReceiveCount":2,
"SenderId":"AIDA5WEKBZWN3QYIY7KAJ",
"SentTimestamp":1563430591780,
"MessageDeduplicationId":null,
"MessageGroupId":null,
"SequenceNumber":null,
"Body":"Fuel report for account 0001 on 2015-09-16 at 02:07:31 PM."
}
}
SQS Credentials¶
The following sections provide information about configuration SQS credentials.
Export the AWS region¶
Before configuring credentials, you must export the AWS region. The environment variable must be exported where the Connect worker is running and where the connector is deployed.
export AWS_REGION=<your-aws-region>
Credentials providers¶
By default, the SQS source connector looks for SQS credentials in the following locations and in the following order:
The
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.export AWS_ACCESS_KEY_ID=<your_access_key_id> export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
The
AWS_ACCESS_KEY
andAWS_SECRET_KEY
can be used instead, but are not recognized by the AWS CLI.The
aws.accessKeyId
andaws.secretKey
Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.The
~/.aws/credentials
file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:aws configure
You can also manually create the credentials file using a text editor. The file should contain lines in the following format:
[default] aws_access_key_id = <your_access_key_id> aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the SQS connector will not be able to find the credentials.
See AWS Credentials File Format for additional details.
A query sent to
http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI}
to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.
Choose one of the above to define the AWS credentials that the SQS connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.
Note
Confluent recommends using either Environment variables or a Credentials file because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.
Using Trusted Account Credentials¶
This connector can assume a role and use credentials from a separate trusted account. This is a default feature provided with recent versions of this connector that include an updated version of the AWS SDK.
After you create the trust relationship, an IAM user or an application from the trusted account can
use the AWS Security Token Service (AWS STS)
AssumeRole
API operation. This operation provides temporary security credentials that enable
access to AWS resources for the connector. For details, see
Creating a Role to Delegate Permissions to an IAM User.
- Example:
Profile in ~/.aws/credentials: [default] role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role source_profile=staging role_session_name = OPTIONAL_SESSION_NAME [staging] aws_access_key_id = <STAGING KEY> aws_secret_access_key = <STAGING SECRET>
To allow the connector to assume a role with the right permissions, set the
Amazon Resource Name (ARN)
for this role. Additionally, you must choose between source_profile
or credential_source
as the way to get credentials that have permission to assume the role, in the environment where the
connector is running.
Note
When setting up trusted account credentials, be aware that the approach of loading profiles from
both ~/.aws/credentials
and ~/.aws/config
does not work when configuring this connector.
Assumed role settings and credentials must be placed in the ~/.aws/credentials
file.
Custom Credentials Provider¶
All SQS connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, you can create a custom credentials provider.
A credentials provider is a Java class that implements the com.amazon.auth.AWSCredentialsProvider
interface in the AWS Java library and returns AWS credentials from the environment.
By default the SQS connector configuration property sqs.credentials.provider.class
uses the com.amazon.auth.DefaultAWSCredentialsProviderChain
class, which looks for credentials in 5 different places as mentioned
in the previous section.
You can use a different credentials provider. To do this, set the sqs.credentials.provider.class
property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider
interface.
Complete the following steps to use a different credentials provider:
- Find or create a Java credentials provider class that implements the
com.amazon.auth.AWSCredentialsProvider
interface - Put the class file in a JAR file
- Place the JAR file in the
share/java/kafka-connect-sqs
directory on all Connect workers. - Restart the Connect workers.
- Change the SQS connector property file to use your custom credentials.
Add the provider class entry
sqs.credentials.provider.class=<className>
in the SQS connector properties file. Note that you must use the fully qualified class name
Record Schema¶
Then source connector creates records in the following format:
Key Schema¶
The Key is a struct
with the following fields:
Field Name | Schema Type | Optional? | Description |
---|---|---|---|
QueueUrl | string | mandatory | the fully qualified SQS queue url from which this record was generated |
MessageId | string | mandatory | the unique message ID of the message within SQS |
MessageGroupId | string | optional | in case of FIFO queues, the message group ID |
Value Schema¶
The Value is a struct
with the following fields:
Field Name | Schema Type | Optional? | Description |
---|---|---|---|
Body | string | the body of the SQS message. | |
ApproximateFirstReceiveTimestamp | int64 | the timestamp when this message was first received by SQS. | |
ApproximateReceiveCount | int32 | the number of times this message was received by SQS. | |
SenderId | string | the IAM user or role that sent this message to SQS. | |
SentTimestamp | int64 | the timestamp at which the sender sent it to SQS. SentTimestamp <= ApproximateFirstReceiveTimestamp. | |
MessageDeduplicationId | string | Optional | For FIFO queues, a unique ID that can be used to eliminate duplicate messages. |
MessageGroupId | string | Optional | For FIFO queues, the message group that this message belongs to. |
SequenceNumber | string | A globally unique sequence number generated by SQS. |
Header Schema¶
Each message attribute in SQS is converted to a Header in Kafka.
- The key of the header is the key of the message attribute.
- The value of the header is the value of the message attribute.
- The schema of the header depends on the data type of the message attribute.
- String message attributes have string schema.
- Number message attributes also have string schema.
- Binary message attributes have bytes schema.
- Custom message attributes will either be string or bytes, depending on the type of custom attribute.