Google Cloud Functions Sink Connector for Confluent Platform

The Kafka Connect Google Cloud Functions Sink connector integrates Apache Kafka® with Google Cloud Functions. The connector consumes records from Kafka topic(s) and executes a Google Cloud Function. Each request sent to Google Cloud Functions can contain up to max.batch.size records. The connector can also send many requests concurrently, set by max.pending.requests.

The Connector also supports invoking authenticated functions–that is, functions with private access. The user must provide service user credentials using either the gcf.credentials.json or gcf.credentials.path configuration options to use this functionality. The service user, whose credentials are provided, must also have a minimum of cloudfunctions.functions.get and cloudfunctions.functions.invoke IAM roles assigned to it. If the user doesn’t provide the required credentials, it’s assumed that the Google Cloud function can be invoked without any authentication.

The target function must be configured and ready to accept requests with the following JSON format:

[
  {
    "key": ...,
    "value": ...,
    "topic": string,
    "partition": <number>,
    "offset": <number>,
    "timestamp": <number>
  },
  ...,
]

Note

The key and value are encoded as follows:

  • String, int, long, float, double, boolean, null are encoded as-is into JSON.
  • Structs are converted to JSON and exported without the schema.
  • byte[] is encoded as a base64 String and sent as a JSON string.
  • Any other Java objects are converted to String using toString(), and then sent as JSON strings.

The connector receives the responses from the Google Cloud Function and writes it to a result or error topic (set by configurations) depending on the HTTP Response code. Response code 400 and above are considered errors and anything below is a success.

The connector attempts to map each response to a single record before producing it to the corresponding topic. It can receive the responses from the Google Cloud Function in the following three formats.

  1. The first format is JSON:

    [
      {
        "payload": {
          "result": ...,
          "topic": string,
          "partition": <number>,
          "offset": <number>,
        }
      },
      ...
    ]
    

    This list can be out of order relative to the order that the records were provided. The connector will correctly match the record to the result based off its Kafka coordinates. However the list must be one-to-one to the list of records that were sent in the request.

  2. The second format is a JSON list:

    [
      ...,
      ...,
      ...
    ]
    

    As long as the list is one-to-one to the list of records, the list will be assumed to be ordered and matched with the corresponding records.

  3. The third format can be any format that does not satisfy either of the above formats. The connector will report the entire response for each individual record (one-to-many correlation).

Features

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Limitations

The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

  • io.debezium.transforms.ByLogicalTableRouter
  • io.debezium.transforms.outbox.EventRouter
  • org.apache.kafka.connect.transforms.RegexRouter
  • org.apache.kafka.connect.transforms.TimestampRouter
  • io.confluent.connect.transforms.MessageTimestampRouter
  • io.confluent.connect.transforms.ExtractTopic$Key
  • io.confluent.connect.transforms.ExtractTopic$Value

Install the Google Cloud Functions Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Note

You must install the connector on every machine where Connect will run.

  • An installation of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-gcp-functions:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-gcp-functions:1.0.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Google Cloud Functions Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

This quick start uses the Google Cloud Functions Sink connector to consume records and send them to a Google Cloud Functions function.

Prerequisites
  1. Before starting the connecting, create and deploy a basic Google Cloud Functions instance.

    • Navigate to the Google Cloud Console.
    • Go to the Cloud Functions tab.
    • Create a new function.
    • For creating an unauthenticated function select Allow unauthenticated invocations and go ahead.
    • For authenticated functions select Require Authentication and then click Variables, Networking and Advanced Settings to display additional settings. Click the Service account drop down and select the desired service account.
    • Note down the project id, the region, and the function name as they will be used later.
    • Further, to add an invoker account for a already deployed function, click Add members in Permission tab of the functions home page. In the popup, select add member and select Cloud Functions Invoker Role.
  2. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-gcp-functions:latest
    
  3. Start Confluent Platform.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    
  4. Produce test data to the functions-messages topic in Kafka using the CLI command below.

    echo key1,value1 | confluent local services kafka produce functions-messages --property parse.key=true --property key.separator=,
    echo key2,value2 | confluent local services kafka produce functions-messages --property parse.key=true --property key.separator=,
    echo key3,value3 | confluent local services kafka produce functions-messages --property parse.key=true --property key.separator=,
    
  5. Create a gcp-functions.json file with the following contents:

    {
      "name": "gcp-functions",
      "config": {
        "topics": "functions-messages",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.gcp.functions.GoogleCloudFunctionsSinkConnector",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor":1,
        "function.name": "<insert function name here>",
        "project.id": "<insert project id here>",
        "region": "<insert region here>",
        "gcf.credentials.path": "<insert path to service credentials JSON here>",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.error.topic.name": "test-error",
        "reporter.error.topic.replication.factor": 1,
        "reporter.error.topic.key.format": "string",
        "reporter.error.topic.value.format": "string",
        "reporter.result.topic.name": "test-result",
        "reporter.result.topic.key.format": "string",
        "reporter.result.topic.value.format": "string",
        "reporter.result.topic.replication.factor": 1
      }
    }
    

    Note

    For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  6. Load the Google Cloud Functions Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load gcp-functions --config gcp-functions.json
    

    Important

    Don’t use the CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status gcp-functions
    
  8. Confirm that the messages were delivered to the result topic in Kafka

    confluent local services kafka consume test-result --from-beginning
    
  9. Cleanup resources

    • Delete the connector

      confluent local services connect connector unload gcp-functions
      
    • Stop Confluent Platform

      confluent local stop
      
    • Delete the created Google Cloud Function in the Google Cloud Platform portal.