Google Cloud Dataproc Sink Connector for Confluent Platform¶
The Kafka Connect Google Cloud Dataproc Sink connector integrates Apache Kafka® with managed HDFS instances in Google Cloud Dataproc.
The connector periodically polls data from Kafka and writes this data to HDFS. The data from each Kafka topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is represented as an HDFS file. The HDFS filename is a combination of the topic, Kafka partition, with the start and end offsets of the data chunk.
If no partitioner is specified in the configuration, the default partitioner that preserves the Kafka partitioning is used. The size of each data chunk is determined by the number of records written to HDFS, the time the data was written to HDFS, and schema compatibility.
The Kafka Connect Google Cloud Dataproc connector integrates with Hive. When it is enabled, the connector automatically creates an external Hive partitioned table for each Kafka topic and updates the table according to the available data in HDFS.
Features¶
The Dataproc Connector offers features provided by HDFS 2 Sink connector for Confluent Platform. In addition, the following features are provided specifically for managed HDFS clusters on Google Dataproc:
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Seamless Dataproc integration
- Supporting HA clusters
- HTTP proxy
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Google Cloud Dataproc Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Seamless Dataproc integration¶
Just provide the connector with GCP credentials and Dataproc cluster name, region, and project. No need to figure out HDFS URL or adjust Hadoop configs.
Supporting HA clusters¶
No special configuration is required to connect to multi-master HA cluster.
HTTP proxy¶
Connecting to Dataproc through HTTP proxy with or without basic authentication is supported.
Limitations¶
The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Google Cloud Dataproc Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Google Cloud Dataproc Sink Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
An installation of the Confluent CLI.
Java 1.8.
A GCP project and billing enabled, steps here.
A Dataproc cluster created following steps here. The cluster must be deployed with image version 1.4.x or later. Take note of the region and cluster name. They will be used in the connector config.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-gcp-dataproc-sink:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-gcp-dataproc-sink:1.3.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
This quick start uses the Dataproc connector to export data produced by the Avro console producer to HDFS in a Dataproc managed cluster.
Set up credentials¶
Create a service account and service account key under the GCP project.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left navigation panel, click Service accounts.
- In the top toolbar, click Create Service Account.
- Enter the service account name and description; for example
test-service-account
. - Click Create and on the next page select the role
Dataproc Administrator
under Dataproc and the roleStorage Object Viewer
under Storage. - On the next page click Create Key and download the JSON file.
- For this quickstart, save the file under your
$home
directory and name itcredentials.json
.
For more information on service account keys, see the Google documentation.
Load the Connector¶
This quick start assumes that security is not configured for HDFS and Hive metastore. To make the necessary security configurations, see Secure HDFS and Hive Metastore.
First, start all the necessary services using the Confluent CLI.
Tip
- If not already in your PATH, add Confluent’s
bin
directory by running:export PATH=<path-to-confluent>/bin:$PATH
- Ensure you run the connector somewhere with network access to Dataproc cluster, such as a Google Compute Engine VM on the same subnet.
confluent local services start
Next, start the Avro console producer to import a few records to Kafka:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_dataproc \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then in the console producer, type:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the Kafka topic test_dataproc
in Avro format.
Before starting the connector, make sure that the configurations in
etc/gcp-dataproc-sink-quickstart.properties
are properly set to your
configurations of Dataproc. For example, $home
is replaced by your home
directory path; YOUR-PROJECT-ID
, YOUR-CLUSTER-REGION
, and
YOUR-CLUSTER-NAME
are replaced by your perspective values.
Then, start connector by loading its configuration with the following command.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load dataproc-sink --config etc/gcp-dataproc-sink-quickstart.properties
{
"name": "dataproc-sink",
"config": {
"topics": "test_dataproc",
"tasks.max": "1",
"flush.size": "3",
"connector.class": "io.confluent.connect.gcp.dataproc.DataprocSinkConnector",
"gcp.dataproc.credentials.path": "/home/user/credentials.json",
"gcp.dataproc.projectId": "dataproc-project-id",
"gcp.dataproc.region": "us-west1",
"gcp.dataproc.cluster": "dataproc-cluster-name",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"name": "dataproc-sink"
},
"tasks": [],
"type": "sink"
}
To check that the connector started successfully, view the Connect worker’s log by running:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few messages, and then exports data from Kafka to HDFS. After the connector finishes ingesting data to HDFS, check that the data is available in HDFS. From the HDFS namenode in Dataproc:
hadoop fs -ls /topics/test_dataproc/partition=0
You should see a file with the name
/topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro
The file name is encoded as
topic+kafkaPartition+startOffset+endOffset.format
.
You can use avro-tools-1.9.1.jar
(available in Apache mirrors)
to extract the content of the file. Run avro-tools
directly on Hadoop as:
hadoop jar avro-tools-1.9.1.jar tojson \
hdfs://<namenode>/topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro
where “<namenode>” is the HDFS Namenode hostname. Usually, the Namenode hostname will be your clustername with “-m” postfix attached.
Or, if you experience issues, first copy the avro file from HDFS to the local filesystem and try again with Java:
hadoop fs -copyToLocal /topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro \
/tmp/test_dataproc+0+0000000000+0000000002.avro
java -jar avro-tools-1.9.1.jar tojson /tmp/test_dataproc+0+0000000000+0000000002.avro
You should see the following output:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
Finally, stop the Connect worker as well as all the rest of Confluent Platform by running:
confluent local stop
or stop all the services and additionally wipe out any data generated during this quick start by running:
confluent local destroy
Note
If you want to run the quick start with Hive integration,
you need to add the following configurations to
etc/gcp-dataproc-sink-quickstart.properties
before starting the connector:
hive.integration=true
hive.metastore.uris=thrift://<namenode>:9083
schema.compatibility=BACKWARD
After the connector finishes ingesting data to HDFS, you can use Hive to check the data from your namenode:
$hive>SELECT * FROM test_dataproc;
Note
If you leave the hive.metastore.uris
empty, an embedded Hive metastore
will be created in the directory the connector is started. You need to start
Hive in that specific directory to query the data.
Configuration¶
This section gives example configurations that cover common scenarios. For a complete description of the available configuration options, see Configuration Reference for Google Cloud Dataproc Sink Connector for Confluent Platform.
Format and Partitioner¶
You need to specify the format.class
and partitioner.class
if you want
to write other formats to HDFS or use other partitioners. The following example
configurations demonstrates how to write Parquet format and use hourly
partitioner:
format.class=io.confluent.connect.gcp.dataproc.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.gcp.dataproc.hdfs.partitioner.HourlyPartitioner
Note
If you want to use the field partitioner, you need to specify the
partition.field.name
configuration as well to specify the field name of
the record.
Hive Integration¶
At minimum, you need to specify hive.integration
, hive.metastore.uris
and schema.compatibility
when integrating Hive. Here is an example
configuration:
hive.integration=true
hive.metastore.uris=thrift://<namenode>:9083
schema.compatibility=BACKWARD
You should adjust the hive.metastore.uris
according to your Hive
configurations.
Note
If you don’t specify the hive.metastore.uris
, the connector will use a
local metastore with Derby in the directory running the connector. You need
to run Hive in this directory in order to see the Hive metadata change.
Also, to support schema evolution, set the schema.compatibility
to be
BACKWARD
, FORWARD
, or FULL
. This ensures that Hive can query the
data written to HDFS with different schemas using the latest Hive table schema.
For more information on schema compatibility, see
Schema Evolution.
Secure HDFS and Hive Metastore¶
To work with secure HDFS and Hive metastore, you need to specify
hdfs.authentication.kerberos
, connect.hdfs.principal
,
connect.keytab
, hdfs.namenode.principal
:
hdfs.authentication.kerberos=true
connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
connect.hdfs.keytab=path to the connector keytab
hdfs.namenode.principal=namenode principal
You need to create the Kafka connect principals and keytab files using Kerberos and distribute the keytab file to all hosts that running the connector and ensures that only the connector user has read access to the keytab file.
Note
Currently, the connector requires that the principal and the keytab path to
be the same on all the hosts running the connector. The host part of the
hdfs.namenode.prinicipal
needs to be the actual FQDN of the Namenode host
instead of the _HOST
placeholder.
Schema Evolution¶
The Dataproc connector supports schema evolution and reacts to schema changes of
data according to the schema.compatibility
configuration. This section
explains how the connector reacts to schema evolution under different values of
schema.compatibility
. The schema.compatibility
can be set to NONE
,
BACKWARD
, FORWARD
, and FULL
, which means NO compatibility, BACKWARD
compatibility, FORWARD compatibility, and FULL compatibility respectively.
NO Compatibility: By default, the
schema.compatibility
is set toNONE
. In this case, the connector ensures that each file written to HDFS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.BACKWARD Compatibility: If a schema is evolved in a backward compatible manner, you can always use the latest schema to query all the data uniformly. For example, removing fields is a backward-compatible change to a schema, since when the connector encounters records written with the old schema that contains these fields it can just ignore them. Adding a field with a default value is also backward compatible.
If
BACKWARD
is specified in theschema.compatibility
, the connector keeps track of the latest schema used in writing data to HDFS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with a schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in HDFS.FORWARD Compatibility: If a schema is evolved in a forward-compatible manner, you can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.
If
FORWARD
is specified in theschema.compatibility
, the connector projects the data to the oldest schema before writing to the same set of files in HDFS.FULL Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.
If
FULL
is specified in theschema.compatibility
, the connector performs the same action asBACKWARD
.
If Hive integration is enabled, you must specify the schema.compatibility
to
be BACKWARD
, FORWARD
, or FULL
. This ensures that the Hive table
schema is able to query all the data under a topic written with different
schemas. If the schema.compatibility
is set to BACKWARD
or FULL
, the
Hive table schema for a topic will be equivalent to the latest schema in the
HDFS files under that topic that can query the whole data of that topic. If the
schema.compatibility
is set to FORWARD
, the Hive table schema of a topic
is equivalent to the oldest schema of the HDFS files under that topic that can
query the whole data of that topic.