AWS Lambda Sink Connector for Confluent Platform
The Kafka Connect AWS Lambda Sink connector pulls records from one or more Apache Kafka® topics, converts them to JSON, and executes an AWS Lambda function. The response of the AWS Lambda can optionally be written to another Kafka topic.
Important
The AWS Lambda Sink connector is designed to access the general AWS Lambda endpoints. The connector doesn’t support gateway endpoints.
The AWS Lambda function can be invoked either synchronously or asynchronously.
In synchronous mode, records within a topic and partition are processed sequentially. Records within different topic partitions, though, can be processed in parallel. If configured, the response from AWS Lambda can be written to a Kafka topic. In case of errors during Lambda execution, the connector can be configured to either ignore and proceed, log the error, or stop the connector completely.
In asynchronous mode, the connector operates in a fire-and-forget mode. Records are processed on a best-effort, sequential basis. The connector does not attempt any retries. AWS Lambda automatically retries up to two times, after which AWS Lambda can move the request to a dead letter queue.
The connector guarantees at-least-once processing semantics. Under certain circumstances, it is possible that a record is processed more than once. You should design your |aws| Lambda function to be idempotent. If you have configured the connector to log the response from AWS Lambda to a Kafka topic, the topic can contain duplicate records. You can enable Kafka log compaction on the topic to remove duplicate records. Alternatively, you can write a ksqlDB query to detect duplicate records in a time window.
Features
The AWS Lambda Sink connector includes the following features:
At least once delivery
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue
This connector supports the Dead Letter Queue (DLQ) functionality. If an InvalidRequestContentException occurs, the entire batch containing the failed record is sent to the DLQ.
For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks
The AWS Lambda Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Input data formats
The AWS AWS Lambda Sink connector for Confluent Platform supports the following input data formats:
Avro
JSON Schema (JSON_SR)
Protobuf
JSON (schemaless)
Note that you must enable Schema Registry to use a Schema Registry-based format (for example, Avro,
JSON Schema, or Protobuf). If no schema is defined, values are encoded as plain
strings. For example, "name": "Kimberley Human" is encoded as
name=Kimberley Human.
AWS Lambda function versions and aliases
The connector supports invoking specific AWS Lambda function versions or aliases by appending a colon and the desired version or alias to the function name (for example, function:1 for a version or function:alias for an alias).
License
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.
Configuration properties
For a complete list of configuration properties for this connector, see Configuration Reference for AWS Lambda Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Upgrading to Version 3.x
Starting with version 3.x, the AWS Lambda Sink connector uses the AWS SDK v2, upgrading from v1. This upgrade does not allow backward compatibility with versions 1.x and 2.x. If you use any custom or built-in AWS credentials providers, you must update your implementation to ensure compatibility with the new AWS SDK v2.
Follow the steps below based on the type of credentials provider your implementation uses.
Custom credentials provider
Update credentials provider interface: If you have implemented a custom credentials provider, you must update your code to implement the new AWS SDK v2 interface.
Old interface: com.amazon.auth.AWSCredentialsProvider
New interface: software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
Set a new instantiation method: The connector no longer uses a default no-args constructor to create an instance of your custom provider. Instead, your credentials provider class must now implement a static, no-argument
create()method that returns a new instance of your provider.
AWS built-in credentials provider
Use v2 equivalent: If you use a credentials provider built into the AWS SDK itself, you must ensure you use the v2 equivalent of that provider.
Use
create()method: This provider must implement a static, no-argumentcreate()method that returns a new provider instance.
Default credentials provider
Review changes to the default chain: The default credentials provider chain in the AWS SDK v2 has changed the order in which it searches for credentials. The SDK v2 version now checks for system properties before checking for environment variables. This change may affect your connector’s authentication if you previously relied on environment variables and also have system properties set.
For more information about credentials provider migration, see Credentials provider changes for AWS SDK v2.
Install the AWS Lambda Sink connector
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java 1.8.
AWS credentials (For more details, see Access Key ID and Secret Access Key).
An installation of the latest (
latest) connector version.
Install the connector using Confluent CLI
To install the latest connector version, navigate to your Confluent Platform installation
directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-aws-lambda:latest
You can install a specific version by replacing latest with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-aws-lambda:3.0.1
Exporting AWS credentials and region
Before you can run this connector, you must provide credentials and the region where the AWS Lambda function is located.
The credentials provided need to have permissions for the following:
lambda:InvokeFunctionandlambda:GetFunction.Add resource to allow invoking all aliases and versions of the function, including
$LATEST. When you specify function name without a version or alias suffix, all underlying versions, aliases, and$LATESTare implicitly included and accessible.
The following shows a JSON example for setting this policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:GetFunction"
],
"Resource": [
"arn:aws:lambda:*:*:function:<function-name>"
]
}
]
}
Note
If you want to restrict the connector to a particular alias or version, update the permission with alias or versions appended at the end as show below:
arn:aws:lambda:*:*:function:functionName:alias
OR
arn:aws:lambda:*:*:function:functionName:1
Exporting environment variables is sufficient for a development and testing
environment. However, in a production environment, you should provide
credentials as part of the worker process itself using the configuration
property aws.credentials.provider.class. This is the credentials provider or
provider chain to use for authentication to AWS. By default, the connector
uses DefaultCredentialsProvider. For details on configuring a
credentials provider, see AWS Credentials.
The information provided in AWS Credentials is
applicable for most connectors accessing resources in Amazon Web Services, including the
AWS Lambda Sink connector.
Export the following AWS environment variables to allow the connector to access AWS Lambda. Note that these environment variables must be exported where the Kafka Connect worker processes and the connector are deployed.
AWS_DEFAULT_REGIONTo export, run the following command:
export AWS_DEFAULT_REGION=<your-aws-lambda-region>
AWS_ACCESS_KEY_IDTo export, run the following command:
export AWS_ACCESS_KEY_ID=<your-accesskey-id>
AWS_SECRET_ACCESS_KEYTo export, run the following command:
export AWS_SECRET_ACCESS_KEY=<your-secret-access-key>
You can also set the region and the credentials using the aws.lambda.region,
aws.access.key.id, and aws.secret.access.key configurations. If you add
these, they are used by the connector. Additional credentials coming from the
aws.credentials.provider.class configuration are ignored.
Using trusted account credentials
This connector can assume a role and use credentials from a separate trusted account. This is a default feature provided with recent versions of this connector that include an updated version of the AWS SDK.
Important
You cannot use assumed role credentials to access AWS through a proxy server without first passing environment variables or system properties. This is due to an AWS SDK limitation.
After you create the trust relationship, an IAM user or an application from the
trusted account can use the AWS Security Token Service (AWS STS)
AssumeRole API operation. This operation provides temporary security
credentials that enable access to AWS resources for the connector. For
details, see Creating a Role to Delegate Permissions to an IAM User.
Example:
Profile in ~/.aws/credentials:
[default]
role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role
source_profile=staging
role_session_name = OPTIONAL_SESSION_NAME
[staging]
aws_access_key_id = <STAGING KEY>
aws_secret_access_key = <STAGING SECRET>
To allow the connector to assume a role with the right permissions, set the
Amazon Resource Name (ARN)
for the role. Additionally, you must use source_profile as the way to get
credentials that have permission to assume the role in the environment where the
connector is running.
Note that when setting up trusted account credentials, loading profiles from
both ~/.aws/credentials and ~/.aws/config does not work when configuring
this connector. Assumed role settings and credentials must be placed in the
~/.aws/credentials file.
Install the connector manually
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Property-based example
This configuration is typically used with standalone workers.
name=LambdaSinkConnector
connector.class=io.confluent.connect.aws.lambda.AwsLambdaSinkConnector
tasks.max=1
topics=<Required Configuration>
aws.lambda.function.name=<Required Configuration>
aws.lambda.invocation.type=sync
aws.lambda.batch.size=50
behavior.on.error=fail
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
REST-based example
This configuration is typically used with distributed
workers. Write the following JSON to
connector.json, configure all of the required values. Use the command below
to post the configuration to one of the distributed Kafka Connect worker(s).
See Kafka Connect REST API for
more information.
{
"name": "LambdaSinkConnector",
"config" : {
"connector.class" : "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector",
"tasks.max" : "1",
"topics" : "< Required Configuration >",
"aws.lambda.function.name" : "< Required Configuration >",
"aws.lambda.invocation.type" : "sync",
"aws.lambda.batch.size" : "50",
"behavior.on.error" : "fail",
"confluent.topic.bootstrap.servers" : "localhost:9092",
"confluent.topic.replication.factor" : "1"
}
}
Use curl to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/ the endpoint of one of your Kafka Connect workers.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
AWS Lambda payload
The default payload converter converts Kafka records to payload in the form of JSON Array, following is a sample payload:
[
{
"payload": {
"topic": "mytopic",
"partition": 1,
"offset": 101,
"key": "key_from_sink_record",
"value": "value_from_sink_record",
"headers": [{"key":"key1","value":"value1"},{"key":"key2","value":"value2"},{...},...],
"timestamp": 1562844607000
}
},
{
"payload": {
"topic": "mytopic",
"partition": 1,
"offset": 102,
"key": "key_from_sink_record",
"value": "value_from_sink_record",
"headers": [{...},{...},...],
"timestamp": 1562844608000
}
}
]
The key and value are converted to either JSON primitives or objects
according to their schema. If no schema is defined, they are encoded as plain
strings.
For any AWS Lambda invocation, all the records belong to the same topic and partition, and the offset will be in a strictly increasing order.
When the connector is configured, it validates if the Lambda function exists, and if the AWS credentials used can invoke the Lambda function.
Batching records
The AWS Lambda Sink connector combines multiple records into the input payload for the AWS Lambda function invocation. The following rules apply:
A batch of records will belong to the same topic and partition.
A batch always has records in increasing order of the offset.
Total number of records in a batch is less than or equal to the configuration
aws.lambda.batch.sizeand the size of the batch is less than the AWS Lambda Payload Limits.To disable batching, set
aws.lambda.batch.sizeto 1.
Response topic
In sync mode, the connector can optionally log the response from AWS Lambda in
a Kafka topic using Kafka Connect Reporter.
The connector attempts to map each response to a single record before producing it to the corresponding topic. It can receive the responses from the AWS AWS Lambda function in the following three formats.
The first format is JSON:
[ { "payload": { "result": ..., "topic": string, "partition": <number>, "offset": <number>, } }, ... ]
This list can be out of order relative to the order that the records were provided. The connector will correctly match the record to the result based off its Kafka coordinates. However the list must be one-to-one to the list of records that were sent in the request.
The second format is a JSON list:
[ ..., ..., ... ]
As long as the list is one-to-one to the list of records, the list will be assumed to be ordered and matched with the corresponding records.
The third format can be any format that does not satisfy either of the above formats. The connector will report the entire response for each individual record (one-to-many correlation).
The connector logs the output of each record from the AWS Lambda function response in the configured Kafka topic as JSON. The following is an example of a logged JSON response from a AWS Lambda function:
[
{
"topic": "lambda-success",
"partition": 1,
"offset": 101,
"timestamp": 1562844608000,
"headers": [{...},{...},...],
"key": "sink_record_key",
"value": {
"body": "Successfully executed Lambda!",
"status_code": 200
}
},
...
]
AWS Lambda function:
def lambda_handler(event, context):
result_list = []
for obj in event:
result = {}
result["body"] = "Successfully executed Lambda!";
result["status_code"] = 200;
result_list.append(result)
return result_list
To enable a response topic, set reporter.result.topic.name to the topic
where you want to log the responses. The following shows a sample set of
properties to add to configure a response topic:
aws.lambda.invocation.type=sync
reporter.result.topic.name=<Required Configuration>
reporter.bootstrap.servers=<Required Configuration>
Note that in async mode, the connector can’t log the response.
For more information about using this connector with Kafka Connect Reporter, see Connect Reporter.
Error handling
The AWS Lambda Sink connector may encounter the following types of errors:
Transient errors
Transient errors such as network timeouts or errors because of rate limiting. The connector handles transient errors through multiple layers:
AWS SDK automatic retries: The connector relies on the AWS Lambda SDK to automatically handle transient errors such as network timeouts or server errors using the SDK’s built-in retry mechanisms.
Connector-level retries: The connector additionally handles specific AWS Lambda service limits (
Ec2ThrottledException,EniLimitReachedException) by marking them as retriable, allowing Kafka Connect to retry the entire batch. In asynchronous mode, such retries are not applicable as the connector uses fire-and-forget invocation.Socket timeout configuration: Configure
aws.lambda.socket.timeout(default: 50 seconds, range: 1-600 seconds) to control how long the client waits for AWS Lambda responses, preventing hangs on network issues. This setting is not applicable in asynchronous mode.
Configuration errors
Configuration errors such as an incorrect AWS Lambda function names, access-related issues, or invalid credentials are handled as follows:
Startup validation: During connector startup, the connector validates function existence, permissions, and credentials. If validation fails, the connector fails to start.
Runtime configuration errors: If configuration-related errors occur during record processing (for example, function suddenly becomes unavailable, permission changes), the connector does not retry these errors. Instead:
Most configuration errors: The connector throws a
ConnectExceptionand stops the task.Invalid content errors (specifically
InvalidRequestContentException): You can optionally send records to a dead letter queue (DLQ, if configured) instead of stopping the connector.
No automatic recovery: Unlike transient errors, configuration errors require manual intervention to fix the underlying configuration issue before restarting the connector.
AWS Lambda function execution errors
Errors encountered during execution of the AWS Lambda function are classified as handled or unhandled, with different handling behaviors depending on the invocation mode.
Handled errors: Exceptions intentionally thrown by your application or business logic (for example, via the
throwin a try-catch block). Best practice for sync mode as follows:Throw exceptions on failure (avoid returning an error string).
The connector treats thrown exceptions as failures, using
behavior.on.errorto decide whether tofail,log, orignore.
Unhandled errors: Errors from AWS Lambda itself (for example, timeouts, throttling, runtime errors).
In sync mode, the connector interprets similarly to handled errors, and processed according to
behavior.on.error.In async mode, AWS Lambda performs internal retries before dropping the request.
AWS Lambda function execution error scenarios
In case of errors encountered during execution of the AWS Lambda function, the
behavior of the connector depends on the configuration parameters
aws.lambda.invocation.type and behavior.on.error.
aws.lambda.invocation.type |
behavior.on.error |
Error Handling |
|---|---|---|
async |
The connector relies on AWS Lambda to perform retries and error handling. AWS Lambda retries the function twice (a total of three attempts), after which it discards the event. You should configure a dead letter queue if you want to track the input events that failed. |
|
sync |
fail |
|
sync |
log |
The connector logs the error message and continues to process the next batch of records. For help on how to configure the Kafka Connect Reporter to report errors in a separate topic, see Connect Reporter. |
sync |
ignore |
The connector continues to process next set of records without logging the error message in a separate topic. |
Logging errors to Kafka topic
The connector can optionally log errors to a Kafka topic using Kafka Connect Reporter. Use the following configuration settings to enable error logging:
aws.lambda.invocation.type=sync
behavior.on.error=log
reporter.error.topic.name=<Required Configuration>
reporter.bootstrap.servers=<Required Configuration>
For details about using Kafka Connect Reporter, see Connect Reporter.
RecordConverter Interface
The AWS Lambda Sink connector can use a custom implementation of the
RecordConverter interface to convert a batch of records to string payload. The
connector sends the string payload to AWS AWS Lambda in a single invocation.
The following interface can be implemented and the corresponding class is
specified using the record.converter.class connector configuration property.
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;
public interface RecordConverter {
/**
* Converts a Batch of SinkRecords to string payload sent to AWS lambda in single invocation.
* The list of SinkRecords is converted into one single string payload. This payload is
* sent while invoking AWS Lambda.
* @param records : List of Sink Records.
* @return payload: Payload string sent to Lambda
*/
String convertRecordsToPayloadString(List<SinkRecord> records);
}