Google Cloud Functions Sink Connector for Confluent Platform
The Kafka Connect Google Cloud Functions Sink Connector integrates Apache Kafka®
with Google Cloud Functions. The connector consumes records from Kafka topic(s)
and executes a Google Cloud Function. Each request sent to Google Cloud
Functions can contain up to max.batch.size
records. The connector can also
send many requests concurrently, set by max.pending.requests
.
The Connector also supports invoking authenticated functions–that is, functions
with private access. The user must provide service user credentials using either
the gcf.credentials.json
or gcf.credentials.path
configuration options
to use this functionality. The service user, whose credentials are provided,
must also have a minimum of cloudfunctions.functions.get
and
cloudfunctions.functions.invoke
IAM roles assigned to it. If the user
doesn’t provide the required credentials, it’s assumed that the Google Cloud
function can be invoked without any authentication.
The target function must be configured and ready to accept requests with the
following JSON format:
[
{
"key": ...,
"value": ...,
"topic": string,
"partition": <number>,
"offset": <number>,
"timestamp": <number>
},
...,
]
Note
The key and value are encoded as follows:
String
, int
, long
, float
, double
, boolean
, null
are encoded as-is into JSON.
Structs
are converted to JSON and exported without the schema.
byte[]
is encoded as a base64 String
and sent as a JSON string.
- Any other Java objects are converted to
String
using toString()
, and then sent as JSON strings.
The connector receives the responses from the Google Cloud Function and writes
it to a result or error topic (set by configurations) depending on the HTTP
Response code. Response code 400
and above are considered errors and
anything below is a success.
The connector attempts to map each response to a single record before producing
it to the corresponding topic. It can receive the responses from the Google
Cloud Function in the following three formats.
The first format is JSON:
[
{
"payload": {
"result": ...,
"topic": string,
"partition": <number>,
"offset": <number>,
}
},
...
]
This list can be out of order relative to the order that the records were
provided. The connector will correctly match the record to the result based
off its Kafka coordinates. However the list must be one-to-one to the list of
records that were sent in the request.
The second format is a JSON list:
As long as the list is one-to-one to the list of records, the list will be
assumed to be ordered and matched with the corresponding records.
The third format can be any format that does not satisfy either of the
above formats. The connector will report the entire response for each individual
record (one-to-many correlation).
Features
At least once delivery
This connector guarantees that records from the Kafka topic are delivered at
least once.
Install the Google Cloud Functions Connector
You can install this connector by using the Confluent Hub client installation
instructions or by manually
downloading the ZIP file.
Prerequisites
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-gcp-functions:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-gcp-functions:1.0.0-preview
Quick Start
This quick start uses the Google Cloud Functions Sink Connector to consume
records and send them to a Google Cloud Functions function.
- Prerequisites
-
Before starting the connecting, create and deploy a basic Google Cloud Functions instance.
- Navigate to the Google Cloud Console.
- Go to the Cloud Functions tab.
- Create a new function.
- For creating an unauthenticated function select Allow unauthenticated
invocations and go ahead.
- For authenticated functions select Require Authentication and then
click Variables, Networking and Advanced Settings to display additional
settings. Click the Service account drop down and select the desired service
account.
- Note down the project id, the region, and the function name as they will
be used later.
- Further, to add an invoker account for a already deployed function, click
Add members in Permission tab of the functions home page. In the
popup, select add member and select Cloud Functions Invoker Role.
Install the connector through the Confluent Hub
Client.
# run from your CP installation directory
confluent-hub install confluentinc/kafka-connect-gcp-functions:latest
Start Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Produce test data to the functions-messages
topic in Kafka using the CLI
command below.
echo key1,value1 | confluent local services kafka produce functions-messages --property parse.key=true --property key.separator=,
echo key2,value2 | confluent local services kafka produce functions-messages --property parse.key=true --property key.separator=,
echo key3,value3 | confluent local services kafka produce functions-messages --property parse.key=true --property key.separator=,
Create a gcp-functions.json
file with the following contents:
{
"name": "gcp-functions",
"config": {
"topics": "functions-messages",
"tasks.max": "1",
"connector.class": "io.confluent.connect.gcp.functions.GoogleCloudFunctionsSinkConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor":1,
"function.name": "<insert function name here>",
"project.id": "<insert project id here>",
"region": "<insert region here>",
"gcf.credentials.path": "<insert path to service credentials JSON here>",
"reporter.bootstrap.servers": "localhost:9092",
"reporter.error.topic.name": "test-error",
"reporter.error.topic.replication.factor": 1,
"reporter.error.topic.key.format": "string",
"reporter.error.topic.value.format": "string",
"reporter.result.topic.name": "test-result",
"reporter.result.topic.key.format": "string",
"reporter.result.topic.value.format": "string",
"reporter.result.topic.replication.factor": 1
}
}
Note
For details about using this connector with Kafka Connect Reporter, see
Connect
Reporter.
Load the Google Cloud Functions Sink Connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load gcp-functions --config gcp-functions.json
Important
Don’t use the CLI commands in production environments.
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status gcp-functions
Confirm that the messages were delivered to the result topic in Kafka
confluent local services kafka consume test-result --from-beginning
Cleanup resources
Additional Documentation
GOOGLE CLOUD FUNCTIONS SINK