.. _connect_gcp-functions-connector: Google Cloud Functions Sink Connector for |cp| ============================================== The |kconnect-long| Google Cloud Functions Sink Connector integrates |ak-tm| with Google Cloud Functions. The connector consumes records from |ak| topic(s) and executes an Google Cloud Function. Each request sent to Google Cloud Functions contains can contain up to ``max.batch.size`` records. The connector can also send many requests concurrently, set by ``max.outstanding.requests``. The target function must be configured and ready to accept requests with the following JSON format: .. codewithvars:: bash [ { "key": ..., "value": ..., "topic": string, "partition": , "offset": , "timestamp": }, ..., ] .. note:: The key and value are encoded as follows: * ``String``, ``int``, ``long``, ``float``, ``double``, ``boolean``, ``null`` are encoded as-is into JSON. * ``Structs`` are converted to JSON and exported *without* the schema. * ``byte[]`` is encoded as a base64 ``String`` and sent as a JSON string. * Any other Java objects are converted to ``String`` using ``toString()``, and then sent as JSON strings. The connector receives the responses from the Google Cloud Function and writes it to a result or error topic (set by configurations) depending on the HTTP Response code. Response code ``400`` and above are considered errors and anything below is a success. The connector attempts to map each response to a single record before producing it to the corresponding topic. It can receive the responses from the Google Cloud Function in the following three formats. #. The first format is JSON: .. codewithvars:: bash [ { "payload": { "result": ..., "topic": string, "partition": , "offset": , } }, ... ] This list can be out of order relative to the order that the records were provided. The connector will correctly match the record to the result based off its |ak| coordinates. However the list must be one-to-one to the list of records that were sent in the request. #. The second format is a JSON list: .. codewithvars:: bash [ ..., ..., ... ] As long as the list is one-to-one to the list of records, the list will be assumed to be ordered and matched with the corresponding records. #. The third format can be any format that does not satisfy either of the above formats. The connector will report the entire response for each individual record (one-to-many correlation). Install the Google Cloud Functions Connector -------------------------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst :: confluent-hub install confluentinc/kafka-connect-gcp-functions:latest .. include:: ../includes/connector-install-version.rst :: confluent-hub install confluentinc/kafka-connect-gcp-functions:1.0.0-preview -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`gcp-functions-sink-connector-license-config` for license properties and :ref:`gcp_functions_sink_license-topic-configuration` for information about the license topic. Quick Start ----------- This quick start uses the Google Cloud Functions Sink Connector to consume records and send them to a Google Cloud Functions function. .. include:: ../../includes/install-cli-prereqs.rst #. Before starting the connecting, create and deploy a basic Google Cloud Functions instance. * Navigate to the `Google Cloud Console `_. * Go to the `Cloud Functions `_ tab. * Create a new function. Use the default code that is provided. * Note down the project id, the region, and the function name as they will be used later. #. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your CP installation directory confluent-hub install confluentinc/kafka-connect-gcp-functions:latest #. Start |cp|. .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_start| #. Produce test data to the ``functions-messages`` topic in |ak| using the CLI command below. .. codewithvars:: bash echo key1,value1 | confluent local produce functions-messages -- --property parse.key=true --property key.separator=, echo key2,value2 | confluent local produce functions-messages -- --property parse.key=true --property key.separator=, echo key3,value3 | confluent local produce functions-messages -- --property parse.key=true --property key.separator=, #. Create a ``gcp-functions.json`` file with the following contents: .. codewithvars:: json { "name": "gcp-functions", "config": { "topics": "functions-messages", "tasks.max": "1", "connector.class": "io.confluent.connect.gcp.functions.GoogleCloudFunctionsSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor":1, "function.name": "", "project.id": "", "region": "", "reporter.bootstrap.servers": "localhost:9092", "reporter.error.topic.name": "test-error", "reporter.error.topic.replication.factor": 1, "reporter.error.topic.key.format": "string", "reporter.error.topic.value.format": "string", "reporter.result.topic.name": "test-result", "reporter.result.topic.key.format": "string", "reporter.result.topic.value.format": "string", "reporter.result.topic.replication.factor": 1 } } #. Load the Google Cloud Functions Sink Connector. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| gcp-functions|dash| -d gcp-functions.json .. important:: Don't use the CLI commands in production environments. #. Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| gcp-functions #. Confirm that the messages were delivered to the result topic in Kafka .. codewithvars:: bash |confluent_consume| test-result|dash| --from-beginning #. Cleanup resources * Delete the connector .. codewithvars:: bash |confluent_unload| gcp-functions * Stop |cp| .. codewithvars:: bash |confluent_stop| * Delete the created Google Cloud Function in the Google Cloud Platform portal. Additional Documentation ------------------------ .. toctree:: :titlesonly: connector_config changelog