Kafka Connect Google Cloud Pub/Sub Source Connector

The Google Cloud Pub/Sub source connector reads messages from Pub/Sub topic and writes them to Kafka topic. The schema for the Kafka record key and value is described in the section Record Schema. The Pub/Sub Connector uses pull strategy to get messages from the Pub/Sub topic. The messages are pulled synchronously.

Features

The Google Cloud Pub/Sub Source connector offers the following features:

  • Atleast Once Delivery: The connector guarantees that messages from Pub/Sub are delivered at least once to the Kafka topic.
  • No Ordering Guarantees: Google Cloud Pub/Sub provides a highly-available, scalable message delivery service. The tradeoff for having these properties is that the order in which messages are received by subscribers is not guaranteed. It is possible that the records written to Kafka topic end up in a different order.
  • Fetch Multiple Messages In every poll cycle, the connector fetches gcp.pubsub.message.max.count number of messages. By default, this value is 10000. However, if your message size is exceptionally large, you may want to reduce this to a lower number.

Note

While creating the subscription for the Pub/Sub topic, the acknowledgement deadline should be set to a high enough value to avoid duplicating records in Kafka topic. This allows the connector to commit the records and send acknowledgement for each Pub/Sub message processed. In the case when connector fails to write records to Kafka topic, the messages in the Pub/Sub topic are again made available.

Prerequisites

The following are required to run the Kafka Connect Google Cloud Pub/Sub Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8

Install Google Cloud Pub/Sub Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-gcp-pubsub:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-gcp-pubsub:1.0.0-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Quick Start

This quick start uses the Google Cloud Pub/Sub Source Connector to read messages from Pub/Sub and write them to Kafka topic.

Start Confluent

Start the Confluent services using the following Confluent CLI command:

confluent start

Important

Do not use the Confluent CLI in production environments.

Publish messages to Pub/Sub topic

To publish messages to Pub/Sub topic, a topic and a subscription should be created. Make sure you have the right permission to create a Pub/Sub topic and a Pub/Sub subscription. Review Quickstart using the gcloud command-line tool.

Create a Pub/Sub topic called topic-1

gcloud pubsub topics create topic-1

Create a Pub/Sub subscription called subscription-1

gcloud pubsub subscriptions create --topic topic-1 subscription-1

Publish three messages to topic-1

gcloud pubsub topics publish topic-1 --message "Peter"
gcloud pubsub topics publish topic-1 --message "Megan"
gcloud pubsub topics publish topic-1 --message "Erin"

Property-based example

Create a pubsub-source.properties file with the following contents. This configuration is used typically along with standalone workers.

name=pubsub-source
connector.class=io.confluent.connect.gcp.pubsub.PubSubSourceConnector
tasks.max=1
kafka.topic=pubsub-topic

gcp.pubsub.project.id=project-1
gcp.pubsub.topic.id=topic-1
gcp.pubsub.subscription.id=subscription-1
gcp.pubsub.credentials.path=/home/some_directory/credentials.json

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

Run the connector with this configuration.

confluent load pubsub-source​ -d pubsub-source.properties

Confirm that the connector is in a RUNNING state.

confluent status pubsub-source

REST-based example

Use this setting with distributed workers. Write the following JSON to pubsub-source-source.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API

{
    "name" : "pubsub-source",
    "config" : {
       "connector.class" : "io.confluent.connect.gcp.pubsub.PubSubSourceConnector",
       "tasks.max" : "1",
       "kafka.topic" : "pubsub-topic",
       "gcp.pubsub.project.id" : "project-1",
       "gcp.pubsub.topic.id" : "topic-1",
       "gcp.pubsub.subscription.id" : "subscription-1",
       "gcp.pubsub.credentials.path" : "/home/some_directory/credentials.json",
       "confluent.topic.bootstrap.servers" : "localhost:9092",
       "confluent.topic.replication.factor" : "1"
    }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

curl -s -X POST -H 'Content-Type: application/json' --data @pubsub-source.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @pubsub-source.json http://localhost:8083/connectors/pubsub-source/config

To consume records written by connector to the configured Kafka topic, run the following command:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081  --topic pubsub-topic --from-beginning

Record Schema

Each Pub/Sub message is converted into exactly one Kafka record, with the following structure:

  1. The Kafka key consists of project Id, message Id and the subscription Id of the Pub/Sub message.
  2. Optionally, the attributes from attribute map of Pub/Sub message can also be made a part of the Kafka key.
  3. The Kafka value consists of message data and message attributes of the Pub/Sub message.

Key Schema

The Key is a struct with the following fields:

Field Name Schema Type Description
ProjectId String The Pub/Sub project containing topic from which messages have to be polled.
TopicId String The Pub/Sub topic containing messages.
SubscriptionId String The Pub/Sub subscrpition of the Pub/Sub topic.
MessageId String A unique id for a Pub/Sub message

Value Schema

The Value is a struct with the following fields:

Field Name Schema Type Description
MessageData Optional String The body of the Pub/Sub message.
AttributeMap Optional String The attribute map associated with the Pub/Sub message.

Additional Documentation