Google Cloud Pub/Sub Source Connector for Confluent Platform¶
The Kafka Connect Google Cloud Pub/Sub Source Connector reads messages from a Pub/Sub topic and writes them to a Kafka topic. The schema for the Kafka record key and value is described in the section Record Schema. The Pub/Sub Connector uses pull strategy to get messages from the Pub/Sub topic. The messages are pulled synchronously.
Features¶
The Google Cloud Pub/Sub Source connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.
Multiple tasks¶
The Google Cloud Pub/Sub Source connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
No ordering guarantees¶
Google Cloud Pub/Sub provides a highly-available, scalable message delivery service. The tradeoff for having these properties is that the order in which messages are received by subscribers is not guaranteed. It is possible that the records written to a Kafka topic end up in a different order.
Fetch Multiple Messages¶
In every poll cycle, the connector fetches gcp.pubsub.message.max.count
number of messages. By default, this value is 10000. However, if your message
size is exceptionally large, you may want to reduce this to a lower number.
Note
While creating the subscription
for the Pub/Sub topic, the acknowledgement deadline
should be set to a high enough value to avoid duplicating records in Kafka topic. This allows the connector to commit the records and send acknowledgement for each Pub/Sub message processed. In the case when connector fails to write records to Kafka topic, the messages in the Pub/Sub topic are again made available.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license and Confluent License Properties for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Google Cloud Pub/Sub Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Google Cloud Pub/Sub Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java 1.8.
A GCP service account that can access the Pub/Sub project. You can create this service account in the Google Cloud Console.
You must grant the Google Cloud Pub/Sub Source connector the
roles/pubsub.subscriber
role in Google Cloud for it to start. For more information, see the list of Pub/Sub IAM roles.An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-gcp-pubsub:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-gcp-pubsub:1.2.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
This quick start uses the Google Cloud Pub/Sub Source connector to read messages from Pub/Sub and write them to a Kafka topic. You must download the Confluent Platform before starting the steps below.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command:
confluent local services start
Important
Do not use the Confluent CLI in production environments.
Set up Credentials¶
Create a service account and service account key under the GCP project.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left navigation panel, click Service accounts.
- In the top toolbar, click Create Service Account.
- Enter the service account name and description; for example
test-service-account
. - Click Create and on the next page select the role
Pub/Sub Subscriber
under Pub/Sub. - On the next page click Create Key and download the JSON file.
- For this quickstart, save the file under your
$home
directory and name itcredentials.json
.
For more information on service account keys, see the Google documentation.
Publish messages to Pub/Sub topic¶
To publish messages to Pub/Sub topic, a topic and a subscription should be created.
Make sure you have the correct permission to create a Pub/Sub topic and a Pub/Sub subscription. You need to initialize your Google Cloud CLI with gcloud init
if you haven’t used the gcloud CLI. Review Quickstart using the gcloud command-line tool.
Create a Pub/Sub topic called topic-1
gcloud pubsub topics create topic-1
Create a Pub/Sub subscription called subscription-1
gcloud pubsub subscriptions create --topic topic-1 subscription-1 --ack-deadline 20
Publish three messages to topic-1
gcloud pubsub topics publish topic-1 --message "Peter"
gcloud pubsub topics publish topic-1 --message "Megan"
gcloud pubsub topics publish topic-1 --message "Erin"
Property-based example¶
Create a pubsub-source.properties
file with the following contents. This configuration is used typically along with standalone workers.
name=pubsub-source
connector.class=io.confluent.connect.gcp.pubsub.PubSubSourceConnector
tasks.max=1
kafka.topic=pubsub-topic
gcp.pubsub.project.id=project-1
gcp.pubsub.topic.id=topic-1
gcp.pubsub.subscription.id=subscription-1
gcp.pubsub.credentials.path=/home/some_directory/credentials.json
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Run the connector with this configuration.
confluent local services connect connector load pubsub-source --config pubsub-source.properties
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status pubsub-source
REST-based example¶
Use this setting with distributed workers. Write the following JSON to pubsub-source-source.json
, configure all of the required values, and use the following command to
post the configuration to one of the distributed connect workers. Check here for more information about the
Kafka Connect REST API
{
"name" : "pubsub-source",
"config" : {
"connector.class" : "io.confluent.connect.gcp.pubsub.PubSubSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "pubsub-topic",
"gcp.pubsub.project.id" : "project-1",
"gcp.pubsub.topic.id" : "topic-1",
"gcp.pubsub.subscription.id" : "subscription-1",
"gcp.pubsub.credentials.path" : "/home/some_directory/credentials.json",
"confluent.topic.bootstrap.servers" : "localhost:9092",
"confluent.topic.replication.factor" : "1"
}
}
Use curl
to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @pubsub-source.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @pubsub-source.json http://localhost:8083/connectors/pubsub-source/config
To consume records written by connector to the configured Kafka topic, run the following command:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic pubsub-topic --from-beginning
Record Schema¶
Each Pub/Sub message is converted into exactly one Kafka record, with the following structure:
- The Kafka key consists of
project Id
,message Id
and thesubscription Id
of the Pub/Sub message. - The Kafka value consists of
message data
andmessage attributes
of the Pub/Sub message.
Key Schema¶
The Key is a struct
with the following fields:
Field Name | Schema Type | Description |
---|---|---|
ProjectId |
String | The Pub/Sub project containing topic from which messages have to be polled. |
TopicId |
String | The Pub/Sub topic containing messages. |
SubscriptionId |
String | The Pub/Sub subscrpition of the Pub/Sub topic. |
MessageId |
String | A unique id for a Pub/Sub message |
Value Schema¶
The Value is a struct
with the following fields:
Field Name | Schema Type | Description |
---|---|---|
MessageData |
Optional String | The body of the Pub/Sub message. |
AttributeMap |
Optional String | The attribute map associated with the Pub/Sub message. |
Troubleshooting¶
Too many duplicates¶
When you have too many duplicate messages in the destination Kafka topic, increase the Acknowledgement deadline
for the subscription. Go to Google Cloud Pub/Sub subscription, select the subscription used in this connector, and increase Acknowledgement deadline
(for example, from 10 to 20 seconds). Or, you can use the following CLI command. For more information on GCP Pub/Sub CLI, see the Google documentation.
gcloud pubsub subscriptions modify-message-ack-deadline <subscription> --ack-deadline <seconds>