Azure Functions Sink Connector for Confluent Platform

The Kafka Connect Azure Functions Sink connector integrates Kafka with Azure Functions.

The connector consumes records from Kafka topic(s) and executes an Azure Function. Each request sent to Azure Functions contains can contain up to max.batch.size records. The connector can also send many requests concurrently, set by max.outstanding.requests.

The target function must be configured and ready to accept requests with the following JSON format:

[
  {
    "key": ...,
    "value": ...,
    "topic": string,
    "partition": <number>,
    "offset": <number>,
    "timestamp": <number>
  },
  ...,
]

Note

The key and value are encoded as follows:

  • String, int, long, float, double, boolean, null are encoded as-is into JSON.
  • Structs are converted to JSON and exported without the schema.
  • byte[] is encoded as a base64 String and sent as a JSON string.
  • Any other Java objects are converted to String using toString(), and then sent as JSON strings.

The connector receives the responses from the Azure Function and writes it to a result or error topic (set by configurations) depending on the HTTP Response code. Response code 400 and above are considered errors and anything below is a success.

The connector attempts to map each response to a single record before producing it to the corresponding topic. It can receive the responses from the Azure Function in the following three formats.

  1. The first format is JSON:

    [
      {
        "payload": {
          "result": ...,
          "topic": string,
          "partition": <number>,
          "offset": <number>,
        }
      },
      ...
    ]
    

    This list can be out of order relative to the order that the records were provided. The connector will correctly match the record to the result based off its Kafka coordinates. However the list must be one-to-one to the list of records that were sent in the request.

  2. The second format is a JSON list:

    [
      ...,
      ...,
      ...
    ]
    

    As long as the list is one-to-one to the list of records, the list will be assumed to be ordered and matched with the corresponding records.

  3. The third format can be any format that does not satisfy either of the above formats. The connector will report the entire response for each individual record (one-to-many correlation).

Features

The Azure Functions Sink connector includes the following features:

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Azure Functions Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Install the Azure Functions Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-azure-functions:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-azure-functions:1.0.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Azure Functions Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick Start

This quick start uses the Azure Functions Sink connector to consume records and execute an example Azure Function.

Prerequisites
  1. Before starting the connector, create and deploy an Azure Functions instance.

    Important

    Make sure to select the Node.js runtime stack and to create the function in-portal.

    • Copy and paste this code into the index.js file in the portal.

      module.exports = async function (context, req) {
          context.log('JavaScript HTTP trigger function processed a request.');
          context.res = {
              status: 200,
              body: req.body
          };
      };
      
    • Copy the function url and function key from the portal and save it for later. Azure Functions should now be set up for the connector.

      Note

      This example function just mirrors the request and sends the request body back in the response.

  2. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent connect plugin install confluentinc/kafka-connect-azure-functions:latest
    
  3. Start Confluent Platform using the Confluent CLI commands.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local start
    
  4. Produce test data to the functions-test topic in Kafka using the Confluent CLI confluent local produce command.

    echo key1,value1 | confluent local produce functions-test --property parse.key=true --property key.separator=,
    echo key2,value2 | confluent local produce functions-test --property parse.key=true --property key.separator=,
    echo key3,value3 | confluent local produce functions-test --property parse.key=true --property key.separator=,
    
  5. Create a azure-functions-sink.json file with the following contents:

    {
      "name": "azure-functions",
      "config": {
        "topics": "functions-test",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.azure.functions.AzureFunctionsSinkConnector",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "function.url": "<the copied function url>",
        "function.key": "<the copied function key>",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.error.topic.name": "test-error",
        "reporter.error.topic.replication.factor": 1,
        "reporter.error.topic.key.format": "string",
        "reporter.error.topic.value.format": "string",
        "reporter.result.topic.name": "test-result",
        "reporter.result.topic.key.format": "string",
        "reporter.result.topic.value.format": "string",
        "reporter.result.topic.replication.factor": 1
      }
    }
    

    Caution

    Do not forget to change the function.url and function.key value in the JSON file to the copied function url and function key respectively.

  6. Load the Azure Functions Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local load azure-functions --config path/to/azure-functions-sink.json
    

    Important

    Don’t use the Confluent CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local status azure-functions
    
  8. Confirm that the messages were delivered to the result topic in Kafka

    confluent local consume test-result --from-beginning
    
  9. Cleanup resources

    • Delete the connector

      confluent local unload azure-functions
      
    • Stop Confluent Platform

      confluent local stop
      
    • Delete the created Azure Function in the Azure portal.