Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Connect AWS Lambda Sink Connector¶
The AWS Lambda sink connector pulls records from one or more Apache Kafka® topics, converts them to JSON, and executes an AWS Lambda function. The response of the lambda function can optionally be written to another Kafka topic.
The AWS Lambda function can be invoked either synchronously or asynchronously.
In synchronous mode, records within a topic and partition are processed sequentially. Records within different topic partitions, though, can be processed in parallel. If configured, the response from AWS Lambda can be written to a Kafka topic. In case of errors during lambda execution, the connector can be configured to either ignore and proceed, log the error, or stop the connector completely.
In asynchronous mode, the connector operates in a fire-and-forget mode. Records are processed on a best-effort, sequential basis. The connector does not attempt any retries. AWS Lambda automatically retries up to two times, after which AWS Lambda can move the request to a dead letter queue.
The connector guarantees at-least-once processing semantics. Under certain circumstances, it is possible that a record is processed more than once. You should design your AWS Lambda function to be idempotent. If you have configured the connector to log the response from AWS Lambda to a Kafka topic, the topic can contain duplicate records. You can enable Kafka log compaction on the topic to remove duplicate records. Alternatively, you can write a KSQL query to detect duplicate records in a time window.
Prerequisites¶
The following are required to run the Kafka Connect AWS Lambda Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
Install the AWS Lambda Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run this command to install the latest (latest
) connector version.
The connector must be installed on every machine where Connect will be run.
confluent-hub install confluentinc/kafka-connect-aws-lambda:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-aws-lambda:1.0.0-preview
Install Connector Manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Property-based example¶
This configuration is typically used with standalone workers.
name=LambdaSinkConnector
connector.class=io.confluent.connect.aws.lambda.AwsLambdaSinkConnector
tasks.max=1
topics=<Required Configuration>
aws.lambda.function.name=<Required Configuration>
aws.lambda.invocation.type=sync
aws.lambda.batch.size=50
behavior.on.error=fail
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
REST-based example¶
This configuration is typically used with distributed workers.
Write the following json to connector.json
, configure all of the required values, and use the command below to
post the configuration to one of the distributed Kafka Connect worker(s). See Kafka Connect REST API for more information.
{
"name": "LambdaSinkConnector",
"config" : {
"connector.class" : "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector",
"tasks.max" : "1",
"topics" : "< Required Configuration >",
"aws.lambda.function.name" : "< Required Configuration >",
"aws.lambda.invocation.type" : "sync",
"aws.lambda.batch.size" : "50",
"behavior.on.error" : "fail",
"confluent.topic.bootstrap.servers" : "localhost:9092",
"confluent.topic.replication.factor" : "1"
}
}
Use curl to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
AWS Lambda Payload¶
The default payload converter converts Kafka records to payload in the form of JSON Array, following is a sample payload:
[
{
"payload": {
"timestamp": 1562844607000,
"topic": "mytopic",
"partition": 1,
"offset": 43822,
"key": .....,
"value": .....
}
},
{
"payload": {
"timestamp": 1562844608000,
"topic": "mytopic",
"partition": 1,
"offset": 43823,
"key": .....,
"value": .....
}
}
....
]
The key
and value
are converted to either JSON primitives or objects according to their schema. If no schema is defined, they are encoded as plain strings.
For any AWS Lambda invocation, all the records belong to the same topic and partition, and the offset will be in a strictly increasing order.
In the case when connector start, a Dry-Run call(invocation) is made to lambda function with an empty payload to validate parameter values and verify that the user or role has permission to invoke the function.
Batching Records¶
The AWS Lambda sink connector combines multiple records into the input payload for the lambda function invocation. The following rules apply:
- A batch of records will belong to the same topic and partition.
- A batch always has records in increasing order of the offset.
- Total number of records in a batch is less than or equal to the configuration
aws.lambda.batch.size
and the size of the batch is less than the AWS Lambda Payload Limits. - To disable batching, set
aws.lambda.batch.size
to 1.
Response Topic¶
In sync
mode, the connector can optionally log the response from lambda in an Kafka topic.
Your AWS Lambda function MUST return a JSON in the following format:
[
{
"payload": {
"timestamp": 1562844607000,
"topic": "mytopic",
"partition": 1,
"offset": 43822,
"result": .....
}
},
{
"payload": {
"timestamp": 1562844608000,
"topic": "mytopic",
"partition": 1,
"offset": 43823,
"result": .....
}
}
....
]
The connector makes the following assumptions:
- The output must be an array, and the length must match the length of the input array.
- The topic, partition and offset of each record in the output must match the topic, partition and offset of records in the input batch.
The connector stores output of each record from the AWS Lambda function response in the configured Kafka topic.
- The key contains the input record coordinates, i.e., topic, partition and offset. These can be used to match the output record to the input record.
- The value contains the entire payload from response as a string.
To enable a response topic, set aws.lambda.response.topic
to the topic where you want to log the responses.
Additionally, add producer properties with the prefix aws.lambda.response.*
Here is a sample set of properties to add to configure a response topic:
aws.lambda.response.topic=<Required Configuration>
aws.lambda.response.bootstrap.servers=<Required Configuration>
aws.lambda.response.client.id=lambda-response-producer
# You can add any other producer properties with the prefix aws.lambda.response.
The default implementation of result.handler.class
expects a JSON array consisting of JSON objects as a response from AWS Lambda function to each Kafka record in the batched payload.
The semantics of response from the AWS Lambda function are expected to be same as that of the batched payload received.
Each JSON entry from the response is separately logged into configured Kafka response topic.
Note that in async
mode, the connector cannot log the response.
Response Schema¶
If configured, each JSON entry from the AWS Lambda response is converted to a Kafka record and written to configured Kafka response topic.
The key has the following schema:
Field | Schema | Description |
---|---|---|
topic | String | The topic from which records were pulled to create the input payload for the lambda function. |
partition | INT32 | The partition from which records were pulled to create the input payload for this lambda execution. |
offset | INT64 | The offset of the input record. This can be used to determine the payload that was sent to lambda. |
The value is obtained by extracting the payload
from the JSON entry.
Error Handling¶
The AWS Lambda sink connector can encounter the following types of errors:
- Transient errors such as network timeouts or errors because of rate limiting.
- Configuration errors such as an incorrect lambda function name or access related issues.
- Errors encountered during execution of the lambda function. These errors are further classified as handled or unhandled.
The AWS Lambda sink connector automatically handles transient errors such as network timeouts. The connector relies on the AWS Lambda SDK to perform retries.
In case of configuration errors, the connector does not retry. Instead, it throws a ConnectException
and stops the task.
In case of errors encountered during execution of the lambda function, the behavior of the connector depends
on the configuration parameters aws.lambda.invocation.type
and behavior.on.error
.
aws.lambda.invocation.type | behavior.on.error | Error Handling |
---|---|---|
async | In asynchronous mode, the connector relies on AWS Lambda to perform retries and error handling. AWS Lambda retries the function twice (i.e. total 3 attempts), after which it discards the event. You should configure a dead letter queue. if you want to track the input events that failed. | |
sync | fail | fail is the default mode. The connector stops processing records for that TopicPartition. Records for other TopicPartitions will continue to process. |
sync | log | The connector will log the error message and continue processing the next batch of records. If aws.lambda.error.topic is configured, the error is stored in the Kafka topic. Otherwise, it is logged to the error stream and can be viewed in the connect logs. |
Logging Errors to Kafka topic¶
The connector can optionally log errors to a Kafka topic. To enable error logging, you must have the following configuration:
aws.lambda.invocation.type=sync
behavior.on.error=log
aws.lambda.error.topic=<Required Configuration>
aws.lambda.error.bootstrap.servers=<Required Configuration>
aws.lambda.error.client.id=lambda-error-producer
# You can add any other producer properties with the prefix aws.lambda.error.
Error Schema¶
If configured, the response from AWS Lambda is converted to a record and written to Kafka.
The key has the following schema:
Field | Schema | Description |
---|---|---|
topic | String | The topic from which records were pulled to create the input payload for the lambda function. |
partition | INT32 | The partition from which records were pulled to create the input payload for this lambda execution. |
startOffset | INT64 | The starting offset (inclusive) of the input records. This can be used to determine the payload that was sent to lambda. |
endOffset | INT64 | The ending offset (exclusive) of the input records. This can be used to determine the payload that was sent to lambda. |
The value has the following schema:
Field | Schema | Description |
---|---|---|
functionName | String | The AWS Lambda function that had an error. |
inputPayload | Optional String | The input JSON that was provided to the lambda function |
statusCode | INT32 | Same as response schema |
functionError | Optional String | Same as response schema, but guaranteed to be non-null. |
logResult | Optional String | Same as response schema. |
payload | Optional String | A JSON string describing the error. Typically, this includes the stack trace and other details identifying the problem. |
executedVersion | Optional String | Same as response schema. |