Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Connect Google Cloud Functions Sink Connector¶
The Kafka Connect Google Cloud Functions Sink Connector integrates Apache Kafka® with Google Cloud Functions.
The connector consumes records from Kafka topic(s) and executes an Google Cloud
Function. Each request sent to Google Cloud Functions contains can contain up to
max.batch.size
records. The connector can also send many requests
concurrently, set by max.outstanding.requests
.
The target function must be configured and ready to accept requests with the following JSON format:
[
{
"key": ...,
"value": ...,
"topic": string,
"partition": <number>,
"offset": <number>,
"timestamp": <number>
},
...,
]
Note
The key and value are encoded as follows:
String
,int
,long
,float
,double
,boolean
,null
are encoded as-is into JSON.Structs
are converted to JSON and exported without the schema.byte[]
is encoded as a base64String
and sent as a JSON string.- Any other Java objects are converted to
String
usingtoString()
, and then sent as JSON strings.
The connector receives the responses from the Google Cloud Function and writes it to a
result or error topic (set by configurations) depending on the HTTP Response
code. Response code 400
and above are considered errors and anything below
is a success.
The connector attempts to map each response to a single record before producing it to the corresponding topic. It can receive the responses from the Google Cloud Function in the following three formats.
The first format is JSON:
[ { "payload": { "result": ..., "topic": string, "partition": <number>, "offset": <number>, } }, ... ]
This list can be out of order relative to the order that the records were provided. The connector will correctly match the record to the result based off its Kafka coordinates. However the list must be one-to-one to the list of records that were sent in the request.
The second format is a JSON list:
[ ..., ..., ... ]
As long as the list is one-to-one to the list of records, the list will be assumed to be ordered and matched with the corresponding records.
The third format can be any format that does not satisfy either of the above formats. The connector will report the entire response for each individual record (one-to-many correlation).
Install Google Cloud Functions Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run this command to install the latest (latest
) connector version.
The connector must be installed on every machine where Connect will be run.
confluent-hub install confluentinc/kafka-connect-gcp-functions:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-gcp-functions:1.0.0-preview
Install Connector Manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Quick Start¶
This quick start uses the Google Cloud Functions Sink Connector to consume records and send them to a Google Cloud Functions function.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Before starting the connecting, create and deploy a basic Google Cloud Functions instance.
- Navigate to the Google Cloud Console.
- Go to the Cloud Functions tab.
- Create a new function. Use the default code that is provided.
- Note down the project id, the region, and the function name as they will be used later.
Install the connector through the Confluent Hub Client.
# run from your CP installation directory confluent-hub install confluentinc/kafka-connect-gcp-functions:latest
Start Confluent Platform.
confluent start
Produce test data to the
functions-messages
topic in Kafka using the CLI command below.echo key1,value1 | confluent local produce functions-messages -- --property parse.key=true --property key.separator=, echo key2,value2 | confluent local produce functions-messages -- --property parse.key=true --property key.separator=, echo key3,value3 | confluent local produce functions-messages -- --property parse.key=true --property key.separator=,
Create a
gcp-functions.json
file with the following contents:{ "name": "gcp-functions", "config": { "topics": "functions-messages", "tasks.max": "1", "connector.class": "io.confluent.connect.gcp.functions.GoogleCloudFunctionsSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "function.name": "<insert function name here>", "project.id": "<insert project id here>", "region": "<insert region here>", "reporter.bootstrap.servers": "localhost:9092", "reporter.error.topic.name": "test-error", "reporter.error.topic.replication.factor": 1, "reporter.error.topic.key.format": "string", "reporter.error.topic.value.format": "string", "reporter.result.topic.name": "test-result", "reporter.result.topic.key.format": "string", "reporter.result.topic.value.format": "string", "reporter.result.topic.replication.factor": 1 } }
Load the Google Cloud Functions Sink Connector.
confluent load gcp-functions -d gcp-functions.json
Important
Don’t use the CLI commands in production environments.
Confirm that the connector is in a
RUNNING
state.confluent status gcp-functions
Confirm that the messages were delivered to the result topic in Kafka
confluent consume test-result --from-beginning
Cleanup resources
Delete the connector
confluent unload gcp-functions
Stop Confluent Platform
confluent stop
Delete the created Google Cloud Function in the Google Cloud Platform portal.