AWS Lambda Sink Connector for Confluent Platform

The Kafka Connect AWS Lambda sink connector pulls records from one or more Apache Kafka® topics, converts them to JSON, and executes an AWS Lambda function. The response of the AWS Lambda can optionally be written to another Kafka topic.

The AWS Lambda function can be invoked either synchronously or asynchronously.

In synchronous mode, records within a topic and partition are processed sequentially. Records within different topic partitions, though, can be processed in parallel. If configured, the response from AWS Lambda can be written to a Kafka topic. In case of errors during Lambda execution, the connector can be configured to either ignore and proceed, log the error, or stop the connector completely.

In asynchronous mode, the connector operates in a fire-and-forget mode. Records are processed on a best-effort, sequential basis. The connector does not attempt any retries. AWS Lambda automatically retries up to two times, after which AWS Lambda can move the request to a dead letter queue.

The connector guarantees at-least-once processing semantics. Under certain circumstances, it is possible that a record is processed more than once. You should design your AWS Lambda function to be idempotent. If you have configured the connector to log the response from AWS Lambda to a Kafka topic, the topic can contain duplicate records. You can enable Kafka log compaction on the topic to remove duplicate records. Alternatively, you can write a ksqlDB query to detect duplicate records in a time window.

Note

If you are installing the connector on Confluent Cloud, see AWS Lambda Sink Connector for Confluent Cloud.

Prerequisites

The following are required to run the Kafka Connect AWS Lambda Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8
  • AWS credentials (see Access Key ID and Secret Access Key)

Exporting AWS Credentials and Region

Before you can run this connector, you must provide credentials and the region where the AWS Lambda project is located.

Exporting environment variables is sufficient for a development and testing environment. However, in a production environment, you should provide credentials as part of the worker process itself using the configuration property aws.credentials.provider.class. This is the credentials provider or provider chain to use for authentication to AWS. By default, the connector uses DefaultAWSCredentialsProviderChain. For details on configuring a credentials provider, see AWS Credentials. The information provided in AWS Credentials is applicable for most connectors accessing resources in Amazon Web Services, including the AWS Lambda Sink connector.

Export the following AWS environment variable to allow the connector to access AWS Lambda. These environment variables must be exported where the Kafka Connect worker processes and the connector are deployed.

  • AWS_DEFAULT_REGION
  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY

To export these environment variables, enter the following commands:

export AWS_DEFAULT_REGION=<your-aws-lambda-region>
export AWS_ACCESS_KEY_ID=<your-accesskey-id>
export AWS_SECRET_ACCESS_KEY=<your-secret-access-key>

Install the AWS Lambda Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-aws-lambda:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-aws-lambda:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see AWS Lambda Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Property-based example

This configuration is typically used with standalone workers.

 name=LambdaSinkConnector
 connector.class=io.confluent.connect.aws.lambda.AwsLambdaSinkConnector
 tasks.max=1

 topics=<Required Configuration>

 aws.lambda.function.name=<Required Configuration>
 aws.lambda.invocation.type=sync
 aws.lambda.batch.size=50

 behavior.on.error=fail

 confluent.topic.bootstrap.servers=localhost:9092
 confluent.topic.replication.factor=1

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

REST-based example

This configuration is typically used with distributed workers. Write the following JSON to connector.json, configure all of the required values. Use the command below to post the configuration to one of the distributed Kafka Connect worker(s). See Kafka Connect REST API for more information.

 {
   "name": "LambdaSinkConnector",
   "config" : {
     "connector.class" : "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector",
     "tasks.max" : "1",

     "topics" : "< Required Configuration >",

     "aws.lambda.function.name" : "< Required Configuration >",
     "aws.lambda.invocation.type" : "sync",
     "aws.lambda.batch.size" : "50",

     "behavior.on.error" : "fail",

     "confluent.topic.bootstrap.servers" : "localhost:9092",
     "confluent.topic.replication.factor" : "1"
   }
 }

Use curl to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect workers.

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

AWS Lambda Payload

The default payload converter converts Kafka records to payload in the form of JSON Array, following is a sample payload:

[
    {
        "payload": {
            "timestamp": 1562844607000,
            "topic": "mytopic",
            "partition": 1,
            "offset": 43822,
            "key": .....,
            "value": .....
        }
    },
    {
        "payload": {
            "timestamp": 1562844608000,
            "topic": "mytopic",
            "partition": 1,
            "offset": 43823,
            "key": .....,
            "value": .....
        }
    }
    ....
]

The key and value are converted to either JSON primitives or objects according to their schema. If no schema is defined, they are encoded as plain strings.

For any AWS Lambda invocation, all the records belong to the same topic and partition, and the offset will be in a strictly increasing order.

When the connector starts, a Dry-Run call (invocation) is made to the Lambda Function with an empty payload. This validates parameter values and verifies that the user or role has permission to invoke the function.

Batching Records

The AWS Lambda sink connector combines multiple records into the input payload for the Lambda Function invocation. The following rules apply:

  • A batch of records will belong to the same topic and partition.
  • A batch always has records in increasing order of the offset.
  • Total number of records in a batch is less than or equal to the configuration aws.lambda.batch.size and the size of the batch is less than the AWS Lambda Payload Limits.
  • To disable batching, set aws.lambda.batch.size to 1.

Response Topic

In sync mode, the connector can optionally log the response from AWS Lambda in a Kafka topic using Kafka Connect Reporter. Your AWS Lambda function must return JSON in the following format:

[
    {
        "payload": {
            ...
        }
    },
    {
        "payload": {
            ...
        }
    }
    ....
]

The connector makes the following assumptions:

  • The output must be an array and the length must match the length of the input array.
  • Each item in the array must be an object containing the key “payload”.
  • The value of the payload object contains the response as a JSON object.

The connector stores output of each record from the AWS Lambda function response in the configured Kafka topic.

To enable a response topic, set reporter.result.topic.name to the topic where you want to log the responses. The following shows a sample set of properties to add to configure a response topic:

aws.lambda.invocation.type=sync

reporter.result.topic.name=<Required Configuration>
reporter.bootstrap.servers=<Required Configuration>

Each JSON entry from the response is separately logged into the configured Kafka response topic. Note that in async mode, the connector cannot log the response.

See Connect Reporter for more about using this connector with Kafka Connect Reporter.

Error Handling

The AWS Lambda sink connector may encounter the following types of errors:

  • Transient errors such as network timeouts or errors because of rate limiting.
  • Configuration errors such as an incorrect Lambda Function name or access-related issues.
  • Errors encountered during execution of the Lambda Function. These errors are further classified as handled or unhandled.

The AWS Lambda sink connector automatically handles transient errors such as network timeouts. The connector relies on the AWS Lambda SDK to perform retries.

In case of configuration errors, the connector does not retry. Instead, it throws a ConnectException and stops the task.

In case of errors encountered during execution of the Lambda Function, the behavior of the connector depends on the configuration parameters aws.lambda.invocation.type and behavior.on.error.

aws.lambda.invocation.type behavior.on.error Error Handling
async   In asynchronous mode, the connector relies on AWS Lambda to perform retries and error handling. AWS Lambda retries the function twice (a total of three attempts), after which it discards the event. You should configure a dead letter queue if you want to track the input events that failed.
sync fail fail is the default mode. The connector stops processing records for that TopicPartition. Records for other TopicPartitions will continue to process.
sync log The connector will log the error message and continue processing the next batch of records. See Connect Reporter for how to configure the Kafka Connect Reporter to report errors in a separate topic.

Logging Errors to Kafka topic

The connector can optionally log errors to a Kafka topic using Kafka Connect Reporter. Use the following configuration settings to enable error logging:

aws.lambda.invocation.type=sync
behavior.on.error=log

reporter.error.topic.name=<Required Configuration>
reporter.bootstrap.servers=<Required Configuration>

For details about using Kafka Connect Reporter, see Connect Reporter.