Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Google Cloud Dataproc Sink Connector for Confluent Platform¶
Note
If you are using Confluent Cloud, see https://docs.confluent.io/cloud/current/connectors/cc-gcp-dataproc-sink.html for the cloud Quick Start.
The Kafka Connect Google Cloud Dataproc Sink Connector integrates Apache Kafka® with managed HDFS instances in Google Cloud Dataproc.
The connector periodically polls data from Kafka and writes this data to HDFS. The data from each Kafka topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is represented as an HDFS file. The HDFS filename is a combination of the topic, Kafka partition, with the start and end offsets of the data chunk.
If no partitioner is specified in the configuration, the default partitioner that preserves the Kafka partitioning is used. The size of each data chunk is determined by the number of records written to HDFS, the time the data was written to HDFS, and schema compatibility.
The Kafka Connect Google Cloud Dataproc connector integrates with Hive. When it is enabled, the connector automatically creates an external Hive partitioned table for each Kafka topic and updates the table according to the available data in HDFS.
Prerequisites¶
The following are required to run the Kafka Connect Dataproc Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Java 1.8
- GCP Dataproc cluster deployed with image version 1.4.x or above
Features¶
The Dataproc connector offers features provided by HDFS 2 Sink Connector for Confluent Platform. In addition, the following features are provided specifically for managed HDFS clusters on Google Dataproc:
- Seamless Dataproc Integration: Just provide the connector with GCP credentials and Dataproc cluster name, region, and project. No need to figure out HDFS URL or adjust Hadoop configs.
- Supporting HA clusters: No special configuration is required to connect to multi-master HA cluster.
- HTTP Proxy: Connecting to Dataproc through HTTP proxy with or without basic authentication is supported.
Install the Google Cloud Dataproc Sink Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-gcp-dataproc-sink:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-gcp-dataproc-sink:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Google Cloud Dataproc Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Quick Start¶
This quick start uses the Dataproc connector to export data produced by the Avro console producer to HDFS in a Dataproc managed cluster.
Prerequisites¶
- Cloud Dataproc Prerequisites
- Google Cloud Platform (GCP) Account
- A GCP project and billing enabled, steps here.
- A Dataproc cluster created following steps here. Take note of the region and cluster name. They will be used in the connector config.
- Confluent Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Set up Credentials¶
Create a service account and service account key under the GCP project.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left navigation panel, click Service accounts.
- In the top toolbar, click Create Service Account.
- Enter the service account name and description; for example
test-service-account
. - Click Create and on the next page select the role
Dataproc Administrator
under Dataproc and the roleStorage Object Viewer
under Storage. - On the next page click Create Key and download the JSON file.
- For this quickstart, save the file under your
$home
directory and name itcredentials.json
.
For more information on service account keys, see the Google documentation.
Load the Connector¶
This quick start assumes that security is not configured for HDFS and Hive metastore. To make the necessary security configurations, see Secure HDFS and Hive Metastore.
First, start all the necessary services using the Confluent CLI.
Tip
If not already in your PATH, add Confluent’s bin
directory by running: export PATH=<path-to-confluent>/bin:$PATH
Tip
Make sure to run the connector somewhere with network access to Dataproc cluster, such as a Google Compute Engine VM on the same subnet.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local start
. For more information, see confluent local.
confluent local start
Next, start the Avro console producer to import a few records to Kafka:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_dataproc \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then in the console producer, type:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the Kafka topic test_dataproc
in Avro format.
Before starting the connector, make sure that the configurations in
etc/gcp-dataproc-sink-quickstart.properties
are properly set to your configurations of
Dataproc. For example, $home
is replaced by your home directory path; YOUR-PROJECT-ID
,
YOUR-CLUSTER-REGION
, and YOUR-CLUSTER-NAME
are replaced by your perspective values.
Then, start connector by loading its configuration with the following command.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local load dataproc-sink -- -d etc/gcp-dataproc-sink-quickstart.properties
{
"name": "dataproc-sink",
"config": {
"topics": "test_dataproc",
"tasks.max": "1",
"flush.size": "3",
"connector.class": "io.confluent.connect.gcp.dataproc.DataprocSinkConnector",
"gcp.dataproc.credentials.path": "/home/user/credentials.json",
"gcp.dataproc.projectId": "dataproc-project-id",
"gcp.dataproc.region": "us-west1",
"gcp.dataproc.cluster": "dataproc-cluster-name",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"name": "dataproc-sink"
},
"tasks": [],
"type": "sink"
}
To check that the connector started successfully, view the Connect worker’s log by running:
confluent local log connect
Towards the end of the log you should see that the connector starts, logs a few messages, and then exports data from Kafka to HDFS. After the connector finishes ingesting data to HDFS, check that the data is available in HDFS. From the HDFS namenode in Dataproc:
hadoop fs -ls /topics/test_dataproc/partition=0
You should see a file with the name /topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro
The file name is encoded as topic+kafkaPartition+startOffset+endOffset.format
.
You can use avro-tools-1.9.1.jar
(available in Apache mirrors)
to extract the content of the file. Run avro-tools
directly on Hadoop as:
hadoop jar avro-tools-1.9.1.jar tojson \
hdfs://<namenode>/topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro
where “<namenode>” is the HDFS name node hostname. Usually, the name node hostname will be your clustername with “-m” postfix attached.
Or, if you experience issues, first copy the avro file from HDFS to the local filesystem and try again with Java:
hadoop fs -copyToLocal /topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro \
/tmp/test_dataproc+0+0000000000+0000000002.avro
java -jar avro-tools-1.9.1.jar tojson /tmp/test_dataproc+0+0000000000+0000000002.avro
You should see the following output:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
Finally, stop the Connect worker as well as all the rest of Confluent Platform by running:
confluent local stop
or stop all the services and additionally wipe out any data generated during this quick start by running:
confluent local destroy
Note
If you want to run the quick start with Hive integration,
you need to add the following configurations to
etc/gcp-dataproc-sink-quickstart.properties
before starting the connector:
hive.integration=true
hive.metastore.uris=thrift://<namenode>:9083
schema.compatibility=BACKWARD
After the connector finishes ingesting data to HDFS, you can use Hive to check the data from your namenode:
$hive>SELECT * FROM test_dataproc;
Note
If you leave the hive.metastore.uris
empty, an embedded Hive metastore will be
created in the directory the connector is started. You need to start Hive in that specific
directory to query the data.
Configuration¶
This section gives example configurations that cover common scenarios. For a complete description of the available configuration options, see Google Cloud Dataproc Sink Connector Configuration Properties.
Format and Partitioner¶
You need to specify the format.class
and partitioner.class
if you want to write other
formats to HDFS or use other partitioners. The following example configurations demonstrates how to
write Parquet format and use hourly partitioner:
format.class=io.confluent.connect.gcp.dataproc.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.gcp.dataproc.hdfs.partitioner.HourlyPartitioner
Note
If you want to use the field partitioner, you need to specify the partition.field.name
configuration as well to specify the field name of the record.
Hive Integration¶
At minimum, you need to specify hive.integration
, hive.metastore.uris
and
schema.compatibility
when integrating Hive. Here is an example configuration:
hive.integration=true
hive.metastore.uris=thrift://<namenode>:9083
schema.compatibility=BACKWARD
You should adjust the hive.metastore.uris
according to your Hive configurations.
Note
If you don’t specify the hive.metastore.uris
, the connector will use a local metastore
with Derby in the directory running the connector. You need to run Hive in this directory
in order to see the Hive metadata change.
Also, to support schema evolution, set the schema.compatibility
to be BACKWARD
, FORWARD
, or
FULL
. This ensures that Hive can query the data written to HDFS with different schemas using the
latest Hive table schema. For more information on schema compatibility, see Schema Evolution.
Secure HDFS and Hive Metastore¶
To work with secure HDFS and Hive metastore, you need to specify hdfs.authentication.kerberos
,
connect.hdfs.principal
, connect.keytab
, hdfs.namenode.principal
:
hdfs.authentication.kerberos=true
connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
connect.hdfs.keytab=path to the connector keytab
hdfs.namenode.principal=namenode principal
You need to create the Kafka connect principals and keytab files using Kerberos and distribute the keytab file to all hosts that running the connector and ensures that only the connector user has read access to the keytab file.
Note
Currently, the connector requires that the principal and the keytab path to be the same
on all the hosts running the connector. The host part of the hdfs.namenode.prinicipal
needs
to be the actual FQDN of the Namenode host instead of the _HOST
placeholder.
Schema Evolution¶
The Dataproc connector supports schema evolution and reacts to schema changes of data according to the
schema.compatibility
configuration. This section explains how the
connector reacts to schema evolution under different values of schema.compatibility
. The
schema.compatibility
can be set to NONE
, BACKWARD
, FORWARD
, and FULL
, which means
NO compatibility, BACKWARD compatibility, FORWARD compatibility, and FULL compatibility respectively.
NO Compatibility: By default, the
schema.compatibility
is set toNONE
. In this case, the connector ensures that each file written to HDFS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.BACKWARD Compatibility: If a schema is evolved in a backward compatible manner, you can always use the latest schema to query all the data uniformly. For example, removing fields is a backward-compatible change to a schema, since when the connector encounters records written with the old schema that contains these fields it can just ignore them. Adding a field with a default value is also backward compatible.
If
BACKWARD
is specified in theschema.compatibility
, the connector keeps track of the latest schema used in writing data to HDFS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with a schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in HDFS.FORWARD Compatibility: If a schema is evolved in a forward-compatible manner, you can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.
If
FORWARD
is specified in theschema.compatibility
, the connector projects the data to the oldest schema before writing to the same set of files in HDFS.FULL Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.
If
FULL
is specified in theschema.compatibility
, the connector performs the same action asBACKWARD
.
If Hive integration is enabled, you must specify the schema.compatibility
to be BACKWARD
,
FORWARD
, or FULL
. This ensures that the Hive table schema is able to query all the data under
a topic written with different schemas. If the schema.compatibility
is set to BACKWARD
or
FULL
, the Hive table schema for a topic will be equivalent to the latest schema in the HDFS files
under that topic that can query the whole data of that topic. If the schema.compatibility
is
set to FORWARD
, the Hive table schema of a topic is equivalent to the oldest schema of the HFDS
files under that topic that can query the whole data of that topic.