.. _connect_azure_functions_connector: |az| Functions Sink Connector for |cp| ====================================== The |kconnect-long| |az| Functions Sink Connector integrates |ak| with |az| Functions. The connector consumes records from |ak| topic(s) and executes an |az| Function. Each request sent to |az| Functions contains can contain up to ``max.batch.size`` records. The connector can also send many requests concurrently, set by ``max.outstanding.requests``. The target function must be configured and ready to accept requests with the following JSON format: .. codewithvars:: bash [ { "key": ..., "value": ..., "topic": string, "partition": , "offset": , "timestamp": }, ..., ] .. note:: The key and value are encoded as follows: * ``String``, ``int``, ``long``, ``float``, ``double``, ``boolean``, ``null`` are encoded as-is into JSON. * ``Structs`` are converted to JSON and exported *without* the schema. * ``byte[]`` is encoded as a base64 ``String`` and sent as a JSON string. * Any other Java objects are converted to ``String`` using ``toString()``, and then sent as JSON strings. The connector receives the responses from the |az| Function and writes it to a result or error topic (set by configurations) depending on the HTTP Response code. Response code ``400`` and above are considered errors and anything below is a success. The connector attempts to map each response to a single record before producing it to the corresponding topic. It can receive the responses from the |az| Function in the following three formats. #. The first format is JSON: .. codewithvars:: bash [ { "payload": { "result": ..., "topic": string, "partition": , "offset": , } }, ... ] This list can be out of order relative to the order that the records were provided. The connector will correctly match the record to the result based off its |ak| coordinates. However the list must be one-to-one to the list of records that were sent in the request. #. The second format is a JSON list: .. codewithvars:: bash [ ..., ..., ... ] As long as the list is one-to-one to the list of records, the list will be assumed to be ordered and matched with the corresponding records. #. The third format can be any format that does not satisfy either of the above formats. The connector will report the entire response for each individual record (one-to-many correlation). Install the |az| Functions Connector ------------------------------------ .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst :: confluent-hub install confluentinc/kafka-connect-azure-functions:latest .. include:: ../includes/connector-install-version.rst :: confluent-hub install confluentinc/kafka-connect-azure-functions:1.0.0-preview -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `_ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`azure-functions-sink-connector-license-config` for license properties and :ref:`azure_functions_sink_license-topic-configuration` for information about the license topic. Quick Start ----------- This quick start uses the |az| Functions Sink Connector to consume records and execute an example |az| Function. .. include:: ../../includes/install-cli-prereqs.rst #. Before starting the connector, create and deploy an |az| Functions instance. * Navigate to the Microsoft `Azure Portal `_. * Create a FunctionApp following this `quickstart guide `_. .. important:: Make sure to select the Node.js runtime stack and to create the function in-portal. * Copy and paste this code into the ``index.js`` file in the portal. .. codewithvars:: javascript module.exports = async function (context, req) { context.log('JavaScript HTTP trigger function processed a request.'); context.res = { status: 200, body: req.body }; }; * Copy the function url from the portal and save it for later. |az| Functions should now be set up for the connector. .. note:: This example function just mirrors the request and sends the request body back in the response. #. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your CP installation directory confluent-hub install confluentinc/kafka-connect-azure-functions:latest #. Start |cp| using the :ref:`Confluent CLI ` commands. .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_start| #. Produce test data to the ``functions-test`` topic in |ak| using the :ref:`cli` |confluent_produce| command. .. codewithvars:: bash echo key1,value1 | confluent local produce functions-test -- --property parse.key=true --property key.separator=, echo key2,value2 | confluent local produce functions-test -- --property parse.key=true --property key.separator=, echo key3,value3 | confluent local produce functions-test -- --property parse.key=true --property key.separator=, #. Create a ``azure-functions-sink.json`` file with the following contents: .. codewithvars:: bash { "name": "azure-functions", "config": { "topics": "functions-test", "tasks.max": "1", "connector.class": "io.confluent.connect.azure.functions.AzureFunctionsSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "function.url": "", "reporter.bootstrap.servers": "localhost:9092", "reporter.error.topic.name": "test-error", "reporter.error.topic.replication.factor": 1, "reporter.error.topic.key.format": "string", "reporter.error.topic.value.format": "string", "reporter.result.topic.name": "test-result", "reporter.result.topic.key.format": "string", "reporter.result.topic.value.format": "string", "reporter.result.topic.replication.factor": 1 } } .. caution:: Do not forget to change the ``function.url`` value in the JSON file to the copied function url. #. Load the |az| Functions Sink Connector. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| azure-functions|dash| -d path/to/azure-functions-sink.json .. important:: Don't use the :ref:`Confluent CLI ` commands in production environments. #. Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| azure-functions #. Confirm that the messages were delivered to the result topic in Kafka .. codewithvars:: bash |confluent_consume| test-result|dash| --from-beginning #. Cleanup resources * Delete the connector .. codewithvars:: bash |confluent_unload| azure-functions * Stop |cp| .. codewithvars:: bash |confluent_stop| * Delete the created |az| Function in the |az| portal. Additional Documentation ------------------------ .. toctree:: :titlesonly: connector_config changelog