.. _lambda-sink-connector: |lambda| Sink Connector for |cp| ================================ The |kconnect-long| |lambda| sink connector pulls records from one or more |ak-tm| topics, converts them to JSON, and executes an |lambda| function. The response of the |lambda| can optionally be written to another |ak| topic. The |lambda| function can be invoked either synchronously or asynchronously. In synchronous mode, records within a topic and partition are processed sequentially. Records within different topic partitions, though, can be processed in parallel. If configured, the response from |lambda| can be written to a |ak| topic. In case of errors during Lambda execution, the connector can be configured to either ignore and proceed, log the error, or stop the connector completely. In asynchronous mode, the connector operates in a *fire-and-forget mode*. Records are processed on a best-effort, sequential basis. The connector does not attempt any retries. |lambda| automatically retries up to two times, after which |lambda| can move the request to a `dead letter queue `__. The connector guarantees at-least-once processing semantics. Under certain circumstances, it is possible that a record is processed more than once. You should `design your AWS Lambda function to be idempotent `__. If you have configured the connector to log the response from |lambda| to a Kafka topic, the topic can contain duplicate records. You can enable |ak| log compaction on the topic to remove duplicate records. Alternatively, you can write a KSQL query to detect duplicate records in a time window. Prerequisites ------------- The following are required to run the |kconnect-long| |lambda| Sink Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 4.0.0 or above, or |ak| 1.0.0 or above * Java 1.8 * AWS credentials (see `Access Key ID and Secret Access Key `__) -------------------------------------- Exporting |aws| Credentials and Region -------------------------------------- Before you can run this connector, you must provide credentials and the region where the |lambda| project is located. Exporting environment variables is sufficient for a development and testing environment. However, in a production environment, you should provide credentials as part of the worker process itself using the configuration property ``aws.credentials.provider.class``. This is the credentials provider or provider chain to use for authentication to |aws|. By default, the connector uses ``DefaultAWSCredentialsProviderChain``. For details on configuring a credentials provider, see :ref:`configurable-credentials`. The information provided in :ref:`configurable-credentials` is applicable for most connectors accessing resources in |aws-long|, including the |lambda| Sink connector. Export the following |aws| environment variable to allow the connector to access |lambda|. These environment variables must be exported where the |kconnect-long| worker processes and the connector are deployed. * ``AWS_DEFAULT_REGION`` * ``AWS_ACCESS_KEY_ID`` * ``AWS_SECRET_ACCESS_KEY`` To export these environment variables, enter the following commands: :: export AWS_DEFAULT_REGION= :: export AWS_ACCESS_KEY_ID= :: export AWS_SECRET_ACCESS_KEY= Install the |lambda| Connector ------------------------------ .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-aws-lambda:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-aws-lambda:1.0.0-preview ------------------------------ Install the connector manually ------------------------------ `Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`aws-lambda-sink-connector-license-config` for license properties and :ref:`lambda-sink-license-topic-configuration` for information about the license topic. ---------------------- Property-based example ---------------------- This configuration is typically used with :ref:`standalone workers `. .. code-block:: properties :emphasize-lines: 5,7 name=LambdaSinkConnector connector.class=io.confluent.connect.aws.lambda.AwsLambdaSinkConnector tasks.max=1 topics= aws.lambda.function.name= aws.lambda.invocation.type=sync aws.lambda.batch.size=50 behavior.on.error=fail confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 ------------------ REST-based example ------------------ This configuration is typically used with :ref:`distributed workers `. Write the following JSON to ``connector.json``, configure all of the required values. Use the command below to post the configuration to one of the distributed |kconnect-long| worker(s). See |kconnect-long| :ref:`REST API ` for more information. .. code-block:: bash :emphasize-lines: 7,9 { "name": "LambdaSinkConnector", "config" : { "connector.class" : "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector", "tasks.max" : "1", "topics" : "< Required Configuration >", "aws.lambda.function.name" : "< Required Configuration >", "aws.lambda.invocation.type" : "sync", "aws.lambda.batch.size" : "50", "behavior.on.error" : "fail", "confluent.topic.bootstrap.servers" : "localhost:9092", "confluent.topic.replication.factor" : "1" } } Use curl to post the configuration to one of the |kconnect-long| workers. Change ``http://localhost:8083/`` the endpoint of one of your |kconnect-long| workers. .. code-block:: bash curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors |lambda| Payload ---------------- The default payload converter converts |ak| records to payload in the form of JSON Array, following is a sample payload: .. code-block:: bash [ { "payload": { "timestamp": 1562844607000, "topic": "mytopic", "partition": 1, "offset": 43822, "key": ....., "value": ..... } }, { "payload": { "timestamp": 1562844608000, "topic": "mytopic", "partition": 1, "offset": 43823, "key": ....., "value": ..... } } .... ] The ``key`` and ``value`` are converted to either JSON primitives or objects according to their schema. If no schema is defined, they are encoded as plain strings. For any |lambda| invocation, all the records belong to the same topic and partition, and the offset will be in a strictly increasing order. When the connector starts, a Dry-Run call (invocation) is made to the Lambda Function with an empty payload. This validates parameter values and verifies that the user or role has permission to invoke the function. Batching Records ---------------- The |lambda| sink connector combines multiple records into the input payload for the Lambda Function invocation. The following rules apply: * A batch of records will belong to the same topic and partition. * A batch always has records in increasing order of the offset. * Total number of records in a batch is less than or equal to the configuration ``aws.lambda.batch.size`` and the size of the batch is less than the `AWS Lambda Payload Limits `__. * To disable batching, set ``aws.lambda.batch.size`` to 1. Response Topic -------------- In ``sync`` mode, the connector can optionally log the response from |lambda| in a |ak| topic. Your |lambda| function must return JSON in the following format: .. code-block:: bash [ { "payload": { "timestamp": 1562844607000, "topic": "mytopic", "partition": 1, "offset": 43822, "result": ..... } }, { "payload": { "timestamp": 1562844608000, "topic": "mytopic", "partition": 1, "offset": 43823, "result": ..... } } .... ] The connector makes the following assumptions: * The output must be an array and the length must match the length of the input array. * The topic, partition, and offset of each record in the output must match the topic, partition, and offset of records in the input batch. The connector stores output of each record from the |lambda| function response in the configured |ak| topic. * The key contains the input record coordinates (that is, the topic, partition, and offset). These can be used to match the output record to the input record. * The value contains the entire payload from response as a string. To enable a response topic, set ``aws.lambda.response.topic`` to the topic where you want to log the responses. Additionally, add producer properties with the prefix ``aws.lambda.response.*`` The following shows a sample set of properties to add to configure a response topic: .. code-block:: properties :name: additional properties for response logging :emphasize-lines: 1,3 aws.lambda.response.topic= aws.lambda.response.bootstrap.servers= aws.lambda.response.client.id=lambda-response-producer # You can add any other producer properties with the prefix aws.lambda.response. The default implementation of ``result.handler.class`` expects a JSON array consisting of JSON objects as a response from the |lambda| function to each |ak| record in the batched payload. The semantics of a response from the |lambda| function is expected to be same as that of the batched payload received. Each JSON entry from the response is separately logged into a configured |ak| response topic. Note that in ``async`` mode, the connector cannot log the response. --------------- Response Schema --------------- If configured, each JSON entry from the |lambda| response is converted to a |ak| record and written to configured |ak| response topic. The key has the following schema: +-------------+--------+--------------------------------------------------------------------------------------------------------------------------+ | Field | Schema | Description | +=============+========+==========================================================================================================================+ | topic | String | The topic from which records were pulled to create the input payload for the Lambda Function. | +-------------+--------+--------------------------------------------------------------------------------------------------------------------------+ | partition | INT32 | The partition from which records were pulled to create the input payload for the Lambda execution. | +-------------+--------+--------------------------------------------------------------------------------------------------------------------------+ | offset | INT64 | The offset of the input record. This can be used to determine the payload that was sent to |lambda|. | +-------------+--------+--------------------------------------------------------------------------------------------------------------------------+ The value is obtained by extracting the ``payload`` from the JSON entry. Error Handling -------------- The |lambda| sink connector may encounter the following types of errors: * Transient errors such as network timeouts or errors because of rate limiting. * Configuration errors such as an incorrect Lambda Function name or access-related issues. * Errors encountered during execution of the Lambda Function. These errors are further classified as handled or unhandled. The |lambda| sink connector automatically handles transient errors such as network timeouts. The connector relies on the |lambda| SDK to perform retries. In case of configuration errors, the connector does not retry. Instead, it throws a ``ConnectException`` and stops the task. In case of errors encountered during execution of the Lambda Function, the behavior of the connector depends on the configuration parameters ``aws.lambda.invocation.type`` and ``behavior.on.error``. +----------------------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | aws.lambda.invocation.type | behavior.on.error | Error Handling | +============================+===================+===================================================================================================================================================================================================================================================================================================================================================================+ | async | | In asynchronous mode, the connector relies on |lambda| to perform retries and error handling. |lambda| retries the function twice (a total of three attempts), after which it discards the event. You should configure a `dead letter queue `__ if you want to track the input events that failed. | +----------------------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | sync | fail | ``fail`` is the default mode. The connector stops processing records for that TopicPartition. Records for other TopicPartitions will continue to process. | +----------------------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | sync | log | The connector will log the error message and continue processing the next batch of records. If ``aws.lambda.error.topic`` is configured, the error is stored in the |ak| topic. Otherwise, it is logged to the error stream and can be viewed in the connect logs. | +----------------------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ---------------------------- Logging Errors to |ak| topic ---------------------------- The connector can optionally log errors to a |ak| topic. Use the following configuration settings to enable error logging: .. code-block:: properties :name: additional properties for error logging :emphasize-lines: 4,6 aws.lambda.invocation.type=sync behavior.on.error=log aws.lambda.error.topic= aws.lambda.error.bootstrap.servers= aws.lambda.error.client.id=lambda-error-producer # You can add any other producer properties with the prefix aws.lambda.error. ------------ Error Schema ------------ If configured, the response from |lambda| is converted to a record and written to |ak|. The key has the following schema: +-------------+--------+----------------------------------------------------------------------------------------------------------------------------+ | Field | Schema | Description | +=============+========+============================================================================================================================+ | topic | String | The topic from which records were pulled to create the input payload for the Lambda Function. | +-------------+--------+----------------------------------------------------------------------------------------------------------------------------+ | partition | INT32 | The partition from which records were pulled to create the input payload for the Lambda execution. | +-------------+--------+----------------------------------------------------------------------------------------------------------------------------+ | startOffset | INT64 | The starting offset (inclusive) of the input records. This can be used to determine the payload that was sent to |lambda|. | +-------------+--------+----------------------------------------------------------------------------------------------------------------------------+ | endOffset | INT64 | The ending offset (exclusive) of the input records. This can be used to determine the payload that was sent to |lambda|. | +-------------+--------+----------------------------------------------------------------------------------------------------------------------------+ The value has the following schema: +-----------------+-----------------+--------------------------------------------------------------------------------------------------------------------------+ | Field | Schema | Description | +=================+=================+==========================================================================================================================+ | functionName | String | The Lambda Function that had an error. | +-----------------+-----------------+--------------------------------------------------------------------------------------------------------------------------+ | inputPayload | Optional String | The input JSON that was provided to the Lambda Function. | +-----------------+-----------------+--------------------------------------------------------------------------------------------------------------------------+ | statusCode | INT32 | Same as response schema | +-----------------+-----------------+--------------------------------------------------------------------------------------------------------------------------+ | functionError | Optional String | Same as response schema, but guaranteed to be non-null. | +-----------------+-----------------+--------------------------------------------------------------------------------------------------------------------------+ | logResult | Optional String | Same as response schema. | +-----------------+-----------------+--------------------------------------------------------------------------------------------------------------------------+ | payload | Optional String | A JSON string describing the error. Typically, this includes the stack trace and other details identifying the problem. | +-----------------+-----------------+--------------------------------------------------------------------------------------------------------------------------+ | executedVersion | Optional String | Same as response schema. | +-----------------+-----------------+--------------------------------------------------------------------------------------------------------------------------+ Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 lambda_sink_connector_config changelog