Google Cloud Dataproc Sink Connector for Confluent Platform

The Kafka Connect Google Cloud Dataproc Sink connector integrates Apache Kafka® with managed HDFS instances in Google Cloud Dataproc.

The connector periodically polls data from Kafka and writes this data to HDFS. The data from each Kafka topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is represented as an HDFS file. The HDFS filename is a combination of the topic, Kafka partition, with the start and end offsets of the data chunk.

If no partitioner is specified in the configuration, the default partitioner that preserves the Kafka partitioning is used. The size of each data chunk is determined by the number of records written to HDFS, the time the data was written to HDFS, and schema compatibility.

The Kafka Connect Google Cloud Dataproc connector integrates with Hive. When it is enabled, the connector automatically creates an external Hive partitioned table for each Kafka topic and updates the table according to the available data in HDFS.

Features

The Dataproc Connector offers features provided by HDFS 2 Sink connector for Confluent Platform. In addition, the following features are provided specifically for managed HDFS clusters on Google Dataproc:

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Google Cloud Dataproc Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Seamless Dataproc integration

Just provide the connector with GCP credentials and Dataproc cluster name, region, and project. No need to figure out HDFS URL or adjust Hadoop configs.

Supporting HA clusters

No special configuration is required to connect to multi-master HA cluster.

HTTP proxy

Connecting to Dataproc through HTTP proxy with or without basic authentication is supported.

Limitations

The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

  • io.debezium.transforms.ByLogicalTableRouter
  • io.debezium.transforms.outbox.EventRouter
  • org.apache.kafka.connect.transforms.RegexRouter
  • org.apache.kafka.connect.transforms.TimestampRouter
  • io.confluent.connect.transforms.MessageTimestampRouter
  • io.confluent.connect.transforms.ExtractTopic$Key
  • io.confluent.connect.transforms.ExtractTopic$Value

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Google Cloud Dataproc Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Google Cloud Dataproc Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • An installation of the Confluent CLI.

  • Java 1.8.

  • Google Cloud Platform (GCP) Account.

  • A GCP project and billing enabled, steps here.

  • A Dataproc cluster created following steps here. The cluster must be deployed with image version 1.4.x or later. Take note of the region and cluster name. They will be used in the connector config.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-gcp-dataproc-sink:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-gcp-dataproc-sink:1.2.0
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

This quick start uses the Dataproc connector to export data produced by the Avro console producer to HDFS in a Dataproc managed cluster.

Set up credentials

Create a service account and service account key under the GCP project.

  1. Open the IAM & Admin page in the GCP Console.
  2. Select your project and click Continue.
  3. In the left navigation panel, click Service accounts.
  4. In the top toolbar, click Create Service Account.
  5. Enter the service account name and description; for example test-service-account.
  6. Click Create and on the next page select the role Dataproc Administrator under Dataproc and the role Storage Object Viewer under Storage.
  7. On the next page click Create Key and download the JSON file.
  8. For this quickstart, save the file under your $home directory and name it credentials.json.

For more information on service account keys, see the Google documentation.

Load the Connector

This quick start assumes that security is not configured for HDFS and Hive metastore. To make the necessary security configurations, see Secure HDFS and Hive Metastore.

First, start all the necessary services using the Confluent CLI.

Tip

  • If not already in your PATH, add Confluent’s bin directory by running: export PATH=<path-to-confluent>/bin:$PATH
  • Ensure you run the connector somewhere with network access to Dataproc cluster, such as a Google Compute Engine VM on the same subnet.
confluent local services start

Next, start the Avro console producer to import a few records to Kafka:

  ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_dataproc \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Then in the console producer, type:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

The three records entered are published to the Kafka topic test_dataproc in Avro format.

Before starting the connector, make sure that the configurations in etc/gcp-dataproc-sink-quickstart.properties are properly set to your configurations of Dataproc. For example, $home is replaced by your home directory path; YOUR-PROJECT-ID, YOUR-CLUSTER-REGION, and YOUR-CLUSTER-NAME are replaced by your perspective values.

Then, start connector by loading its configuration with the following command.

Caution

You must include a double dash (--) between the topic name and your flag. For more information, see this post.

confluent local services connect connector load dataproc-sink --config etc/gcp-dataproc-sink-quickstart.properties

{
  "name": "dataproc-sink",
  "config": {
    "topics": "test_dataproc",
    "tasks.max": "1",
    "flush.size": "3",
    "connector.class": "io.confluent.connect.gcp.dataproc.DataprocSinkConnector",
    "gcp.dataproc.credentials.path": "/home/user/credentials.json",
    "gcp.dataproc.projectId": "dataproc-project-id",
    "gcp.dataproc.region": "us-west1",
    "gcp.dataproc.cluster": "dataproc-cluster-name",
    "confluent.license": "",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "name": "dataproc-sink"
  },
  "tasks": [],
  "type": "sink"
}

To check that the connector started successfully, view the Connect worker’s log by running:

confluent local services connect log

Towards the end of the log you should see that the connector starts, logs a few messages, and then exports data from Kafka to HDFS. After the connector finishes ingesting data to HDFS, check that the data is available in HDFS. From the HDFS namenode in Dataproc:

hadoop fs -ls /topics/test_dataproc/partition=0

You should see a file with the name /topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro The file name is encoded as topic+kafkaPartition+startOffset+endOffset.format.

You can use avro-tools-1.9.1.jar (available in Apache mirrors) to extract the content of the file. Run avro-tools directly on Hadoop as:

  hadoop jar avro-tools-1.9.1.jar tojson \
hdfs://<namenode>/topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro

where “<namenode>” is the HDFS Namenode hostname. Usually, the Namenode hostname will be your clustername with “-m” postfix attached.

Or, if you experience issues, first copy the avro file from HDFS to the local filesystem and try again with Java:

  hadoop fs -copyToLocal /topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro \
/tmp/test_dataproc+0+0000000000+0000000002.avro

  java -jar avro-tools-1.9.1.jar tojson /tmp/test_dataproc+0+0000000000+0000000002.avro

You should see the following output:

{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

Finally, stop the Connect worker as well as all the rest of Confluent Platform by running:

confluent local stop

or stop all the services and additionally wipe out any data generated during this quick start by running:

confluent local destroy

Note

If you want to run the quick start with Hive integration, you need to add the following configurations to etc/gcp-dataproc-sink-quickstart.properties before starting the connector:

hive.integration=true
hive.metastore.uris=thrift://<namenode>:9083
schema.compatibility=BACKWARD

After the connector finishes ingesting data to HDFS, you can use Hive to check the data from your namenode:

$hive>SELECT * FROM test_dataproc;

Note

If you leave the hive.metastore.uris empty, an embedded Hive metastore will be created in the directory the connector is started. You need to start Hive in that specific directory to query the data.

Configuration

This section gives example configurations that cover common scenarios. For a complete description of the available configuration options, see Configuration Reference for Google Cloud Dataproc Sink Connector for Confluent Platform.

Format and Partitioner

You need to specify the format.class and partitioner.class if you want to write other formats to HDFS or use other partitioners. The following example configurations demonstrates how to write Parquet format and use hourly partitioner:

format.class=io.confluent.connect.gcp.dataproc.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.gcp.dataproc.hdfs.partitioner.HourlyPartitioner

Note

If you want to use the field partitioner, you need to specify the partition.field.name configuration as well to specify the field name of the record.

Hive Integration

At minimum, you need to specify hive.integration, hive.metastore.uris and schema.compatibility when integrating Hive. Here is an example configuration:

hive.integration=true
hive.metastore.uris=thrift://<namenode>:9083
schema.compatibility=BACKWARD

You should adjust the hive.metastore.uris according to your Hive configurations.

Note

If you don’t specify the hive.metastore.uris, the connector will use a local metastore with Derby in the directory running the connector. You need to run Hive in this directory in order to see the Hive metadata change.

Also, to support schema evolution, set the schema.compatibility to be BACKWARD, FORWARD, or FULL. This ensures that Hive can query the data written to HDFS with different schemas using the latest Hive table schema. For more information on schema compatibility, see Schema Evolution.

Secure HDFS and Hive Metastore

To work with secure HDFS and Hive metastore, you need to specify hdfs.authentication.kerberos, connect.hdfs.principal, connect.keytab, hdfs.namenode.principal:

hdfs.authentication.kerberos=true
connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
connect.hdfs.keytab=path to the connector keytab
hdfs.namenode.principal=namenode principal

You need to create the Kafka connect principals and keytab files using Kerberos and distribute the keytab file to all hosts that running the connector and ensures that only the connector user has read access to the keytab file.

Note

Currently, the connector requires that the principal and the keytab path to be the same on all the hosts running the connector. The host part of the hdfs.namenode.prinicipal needs to be the actual FQDN of the Namenode host instead of the _HOST placeholder.

Schema Evolution

The Dataproc connector supports schema evolution and reacts to schema changes of data according to the schema.compatibility configuration. This section explains how the connector reacts to schema evolution under different values of schema.compatibility. The schema.compatibility can be set to NONE, BACKWARD, FORWARD, and FULL, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility, and FULL compatibility respectively.

  • NO Compatibility: By default, the schema.compatibility is set to NONE. In this case, the connector ensures that each file written to HDFS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.

  • BACKWARD Compatibility: If a schema is evolved in a backward compatible manner, you can always use the latest schema to query all the data uniformly. For example, removing fields is a backward-compatible change to a schema, since when the connector encounters records written with the old schema that contains these fields it can just ignore them. Adding a field with a default value is also backward compatible.

    If BACKWARD is specified in the schema.compatibility, the connector keeps track of the latest schema used in writing data to HDFS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with a schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in HDFS.

  • FORWARD Compatibility: If a schema is evolved in a forward-compatible manner, you can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.

    If FORWARD is specified in the schema.compatibility, the connector projects the data to the oldest schema before writing to the same set of files in HDFS.

  • FULL Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.

    If FULL is specified in the schema.compatibility, the connector performs the same action as BACKWARD.

If Hive integration is enabled, you must specify the schema.compatibility to be BACKWARD, FORWARD, or FULL. This ensures that the Hive table schema is able to query all the data under a topic written with different schemas. If the schema.compatibility is set to BACKWARD or FULL, the Hive table schema for a topic will be equivalent to the latest schema in the HDFS files under that topic that can query the whole data of that topic. If the schema.compatibility is set to FORWARD, the Hive table schema of a topic is equivalent to the oldest schema of the HDFS files under that topic that can query the whole data of that topic.