.. _connect_google_pubsub: .. |pubsub-long| replace:: Google Cloud Pub/Sub .. |pubsub| replace:: Pub/Sub |kconnect-long| |pubsub-long| Source Connector ============================================== The |pubsub-long| source connector reads messages from |pubsub| topic and writes them to |ak| topic. The schema for the |ak| record key and value is described in the section :ref:`pubsub_record-schema`. The |pubsub| Connector uses `pull strategy `_ to get messages from the |pubsub| topic. The messages are pulled `synchronously `_. Features -------- The |pubsub-long| Source connector offers the following features: * **Atleast Once Delivery**: The connector guarantees that messages from |pubsub| are delivered at least once to the |ak| topic. * **No Ordering Guarantees**: |pubsub-long| provides a highly-available, scalable message delivery service. The tradeoff for having these properties is that the *order* in which messages are received by subscribers is `not guaranteed `_. It is possible that the records written to |ak| topic end up in a different order. * **Fetch Multiple Messages** In every poll cycle, the connector fetches ``gcp.pubsub.message.max.count`` number of messages. By default, this value is 10000. However, if your message size is exceptionally large, you may want to reduce this to a lower number. .. note:: While creating the ``subscription`` for the |pubsub| topic, the ``acknowledgement deadline`` should be set to a high enough value to avoid duplicating records in |ak| topic. This allows the connector to commit the records and send acknowledgement for each |pubsub| message processed. In the case when connector fails to write records to |ak| topic, the messages in the |pubsub| topic are again made available. Prerequisites ------------- The following are required to run the |kconnect-long| |pubsub-long| Sink Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 4.0.0 or above, or |ak| 1.0.0 or above * Java 1.8 Install |pubsub-long| Connector ------------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-gcp-pubsub:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-gcp-pubsub:1.0.0-preview -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `_ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`gcp-pubsub-connector-license-config` for license properties and :ref:`gcp-pubsub-license-topic-configuration` for information about the license topic. Quick Start ----------- This quick start uses the |pubsub-long| Source Connector to read messages from |pubsub| and write them to |ak| topic. --------------- Start Confluent --------------- Start the Confluent services using the following :ref:`cli` command: .. codewithvars:: bash |confluent_start| .. important:: Do not use the :ref:`cli` in production environments. ---------------------------------- Publish messages to |pubsub| topic ---------------------------------- To publish messages to |pubsub| topic, a topic and a subscription should be created. Make sure you have the right permission to create a |pubsub| topic and a |pubsub| subscription. Review `Quickstart using the gcloud command-line tool `_. Create a |pubsub| topic called ``topic-1`` .. code-block:: bash gcloud pubsub topics create topic-1 Create a |pubsub| subscription called ``subscription-1`` .. code-block:: bash gcloud pubsub subscriptions create --topic topic-1 subscription-1 Publish three messages to ``topic-1`` .. code-block:: bash gcloud pubsub topics publish topic-1 --message "Peter" gcloud pubsub topics publish topic-1 --message "Megan" gcloud pubsub topics publish topic-1 --message "Erin" ---------------------- Property-based example ---------------------- Create a ``pubsub-source.properties`` file with the following contents. This configuration is used typically along with :ref:`standalone workers `. :: name=pubsub-source connector.class=io.confluent.connect.gcp.pubsub.PubSubSourceConnector tasks.max=1 kafka.topic=pubsub-topic gcp.pubsub.project.id=project-1 gcp.pubsub.topic.id=topic-1 gcp.pubsub.subscription.id=subscription-1 gcp.pubsub.credentials.path=/home/some_directory/credentials.json confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 Run the connector with this configuration. .. codewithvars:: bash |confluent_load| pubsub-source|dash| -d pubsub-source.properties Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| pubsub-source ------------------ REST-based example ------------------ Use this setting with :ref:`distributed workers `. Write the following JSON to ``pubsub-source-source.json``, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the |kconnect-long| :ref:`REST API ` .. code-block:: json { "name" : "pubsub-source", "config" : { "connector.class" : "io.confluent.connect.gcp.pubsub.PubSubSourceConnector", "tasks.max" : "1", "kafka.topic" : "pubsub-topic", "gcp.pubsub.project.id" : "project-1", "gcp.pubsub.topic.id" : "topic-1", "gcp.pubsub.subscription.id" : "subscription-1", "gcp.pubsub.credentials.path" : "/home/some_directory/credentials.json", "confluent.topic.bootstrap.servers" : "localhost:9092", "confluent.topic.replication.factor" : "1" } } Use ``curl`` to post the configuration to one of the |kconnect-long| Workers. Change ``http://localhost:8083/`` to the endpoint of one of your |kconnect-long| worker(s). .. code-block:: bash curl -s -X POST -H 'Content-Type: application/json' --data @pubsub-source.json http://localhost:8083/connectors Use the following command to update the configuration of existing connector. .. code-block:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @pubsub-source.json http://localhost:8083/connectors/pubsub-source/config To consume records written by connector to the configured |ak| topic, run the following command: .. codewithvars:: bash kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic pubsub-topic --from-beginning .. _pubsub_record-schema: Record Schema ------------- Each |pubsub| message is converted into exactly one |ak| record, with the following structure: #. The |ak| key consists of ``project Id``, ``message Id`` and the ``subscription Id`` of the |pubsub| message. #. Optionally, the attributes from ``attribute map`` of |pubsub| message can also be made a part of the |ak| key. #. The |ak| value consists of ``message data`` and ``message attributes`` of the |pubsub| message. ---------- Key Schema ---------- The Key is a ``struct`` with the following fields: +--------------------+-------------+------------------------------------------------------------------------------+ | Field Name | Schema Type | Description | +====================+=============+==============================================================================+ | ``ProjectId`` | String | The |pubsub| project containing topic from which messages have to be polled. | +--------------------+-------------+------------------------------------------------------------------------------+ | ``TopicId`` | String | The |pubsub| topic containing messages. | +--------------------+-------------+------------------------------------------------------------------------------+ | ``SubscriptionId`` | String | The |pubsub| subscrpition of the |pubsub| topic. | +--------------------+-------------+------------------------------------------------------------------------------+ | ``MessageId`` | String | A unique id for a |pubsub| message | +--------------------+-------------+------------------------------------------------------------------------------+ ------------ Value Schema ------------ The Value is a ``struct`` with the following fields: +--------------------------------------+----------------------+------------------------------------------------------------+ | Field Name | Schema Type | Description | +======================================+======================+============================================================+ | ``MessageData`` | Optional String | The body of the |pubsub| message. | +--------------------------------------+----------------------+------------------------------------------------------------+ | ``AttributeMap`` | Optional String | The attribute map associated with the |pubsub| message. | +--------------------------------------+----------------------+------------------------------------------------------------+ Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options changelog