.. _connect_dataproc_sink:
Google Cloud Dataproc Sink Connector for |cp|
=============================================
.. note::
If you are using |ccloud|, see https://docs.confluent.io/cloud/current/connectors/cc-gcp-dataproc-sink.html for the cloud Quick Start.
The |kconnect-long| Google Cloud Dataproc Sink Connector integrates |ak-tm| with
managed HDFS instances in `Google Cloud Dataproc `__.
The connector periodically polls data from |ak| and writes this data to HDFS.
The data from each |ak| topic is partitioned by the provided partitioner and
divided into chunks. Each chunk of data is represented as an HDFS file. The HDFS filename is a combination of the topic, |ak| partition, with the start and end offsets of the data chunk.
If no partitioner is specified in the configuration, the default partitioner
that preserves the |ak| partitioning is used. The size of each data chunk is
determined by the number of records written to HDFS, the time the data was written to HDFS, and schema compatibility.
The |kconnect-long| Google Cloud Dataproc connector integrates with Hive. When
it is enabled, the connector automatically creates an external Hive partitioned
table for each |ak| topic and updates the table according to the available data
in HDFS.
Prerequisites
-------------
The following are required to run the |kconnect-long| Dataproc Sink Connector:
* |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above
* |kconnect|: |cp| 3.3.0 or above, or |ak| 0.11.0 or above
* Java 1.8
* GCP Dataproc cluster deployed with image version 1.4.x or above
Features
--------
The Dataproc connector offers features provided by :ref:`connect_hdfs`. In addition, the following
features are provided specifically for managed HDFS clusters on Google Dataproc:
* **Seamless Dataproc Integration**: Just provide the connector with GCP credentials and Dataproc
cluster name, region, and project. No need to figure out HDFS URL or adjust Hadoop configs.
* **Supporting HA clusters**: No special configuration is required to connect to multi-master HA cluster.
* **HTTP Proxy**: Connecting to Dataproc through HTTP proxy with or without basic authentication is
supported.
Install the Google Cloud Dataproc Sink Connector
------------------------------------------------
.. include:: ../../includes/connector-install.rst
.. include:: ../../includes/connector-install-hub.rst
::
confluent-hub install confluentinc/kafka-connect-gcp-dataproc-sink:latest
.. include:: ../../includes/connector-install-version.rst
::
confluent-hub install confluentinc/kafka-connect-gcp-dataproc-sink:1.0.0-preview
------------------------------
Install the connector manually
------------------------------
`Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `.
License
-------
.. include:: ../../includes/enterprise-license.rst
See :ref:`dataproc-sink-connector-license-config` for license properties and :ref:`dataproc-sink-license-topic-configuration` for information about the license topic.
Configuration Properties
------------------------
For a complete list of configuration properties for this connector, see :ref:`connect_dataproc_sink_config_options`.
.. include:: ../../includes/connect-to-cloud-note.rst
Quick Start
-----------
This quick start uses the Dataproc connector to export data produced by the Avro console producer
to HDFS in a Dataproc managed cluster.
-------------
Prerequisites
-------------
Cloud Dataproc Prerequisites
- `Google Cloud Platform (GCP) Account `_
- A GCP project and billing enabled, `steps here `__.
- A Dataproc cluster created following `steps here `__.
Take note of the region and cluster name. They will be used in the connector config.
Confluent Prerequisites
- :ref:`Confluent Platform `
- :ref:`Confluent CLI ` (requires separate installation)
------------------
Set up Credentials
------------------
Create a service account and service account key under the GCP project.
#. Open the **IAM & Admin** page in the GCP Console.
#. Select your project and click **Continue**.
#. In the left navigation panel, click **Service accounts**.
#. In the top toolbar, click **Create Service Account**.
#. Enter the service account name and description; for example ``test-service-account``.
#. Click **Create** and on the next page select the role ``Dataproc Administrator`` under **Dataproc** and the role ``Storage Object Viewer`` under **Storage**.
#. On the next page click **Create Key** and download the JSON file.
#. For this quickstart, save the file under your ``$home`` directory and name it ``credentials.json``.
For more information on service account keys, see the `Google documentation `_.
------------------
Load the Connector
------------------
This quick start assumes that security is not configured for HDFS and Hive metastore. To make the necessary security
configurations, see :ref:`dataproc-secure-hdfs-hive-metastore`.
First, start all the necessary services using the |confluent-cli|.
.. tip::
If not already in your PATH, add Confluent's ``bin`` directory by running: ``export PATH=/bin:$PATH``
.. tip::
Make sure to run the connector somewhere with network access to Dataproc cluster, such as a
Google Compute Engine VM on the same subnet.
.. include:: ../../../includes/cli-new.rst
.. codewithvars:: bash
|confluent_start|
Next, start the Avro console producer to import a few records to |ak|:
.. codewithvars:: bash
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_dataproc \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then in the console producer, type:
.. codewithvars:: bash
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the |ak| topic ``test_dataproc`` in Avro format.
Before starting the connector, make sure that the configurations in
``etc/gcp-dataproc-sink-quickstart.properties`` are properly set to your configurations of
Dataproc. For example, ``$home`` is replaced by your home directory path; ``YOUR-PROJECT-ID``,
``YOUR-CLUSTER-REGION``, and ``YOUR-CLUSTER-NAME`` are replaced by your perspective values.
Then, start connector by loading its configuration with the following command.
.. include:: ../../../includes/confluent-local-consume-limit.rst
.. codewithvars:: bash
|confluent_load| dataproc-sink|dash| -d etc/gcp-dataproc-sink-quickstart.properties
{
"name": "dataproc-sink",
"config": {
"topics": "test_dataproc",
"tasks.max": "1",
"flush.size": "3",
"connector.class": "io.confluent.connect.gcp.dataproc.DataprocSinkConnector",
"gcp.dataproc.credentials.path": "/home/user/credentials.json",
"gcp.dataproc.projectId": "dataproc-project-id",
"gcp.dataproc.region": "us-west1",
"gcp.dataproc.cluster": "dataproc-cluster-name",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"name": "dataproc-sink"
},
"tasks": [],
"type": "sink"
}
To check that the connector started successfully, view the Connect worker's log by running:
.. codewithvars:: bash
|confluent_log| connect
Towards the end of the log you should see that the connector starts, logs a few messages, and then exports
data from |ak| to HDFS.
After the connector finishes ingesting data to HDFS, check that the data is available in HDFS.
From the HDFS namenode in Dataproc:
.. codewithvars:: bash
hadoop fs -ls /topics/test_dataproc/partition=0
You should see a file with the name ``/topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro``
The file name is encoded as ``topic+kafkaPartition+startOffset+endOffset.format``.
You can use ``avro-tools-1.9.1.jar``
(available in `Apache mirrors `_)
to extract the content of the file. Run ``avro-tools`` directly on Hadoop as:
.. codewithvars:: bash
hadoop jar avro-tools-1.9.1.jar tojson \
hdfs:///topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro
where "" is the HDFS name node hostname. Usually, the name node hostname will be your
clustername with "-m" postfix attached.
Or, if you experience issues, first copy the avro file from HDFS to the local filesystem and try again with Java:
.. codewithvars:: bash
hadoop fs -copyToLocal /topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro \
/tmp/test_dataproc+0+0000000000+0000000002.avro
java -jar avro-tools-1.9.1.jar tojson /tmp/test_dataproc+0+0000000000+0000000002.avro
You should see the following output:
.. codewithvars:: bash
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
Finally, stop the Connect worker as well as all the rest of |cp| by running:
.. codewithvars:: bash
|confluent_stop|
or stop all the services and additionally wipe out any data generated during this quick start by running:
.. codewithvars:: bash
|confluent_destroy|
.. note:: If you want to run the quick start with Hive integration,
you need to add the following configurations to
``etc/gcp-dataproc-sink-quickstart.properties`` before starting the connector::
hive.integration=true
hive.metastore.uris=thrift://:9083
schema.compatibility=BACKWARD
After the connector finishes ingesting data to HDFS, you can use Hive to check the data from your
namenode::
$hive>SELECT * FROM test_dataproc;
.. note:: If you leave the ``hive.metastore.uris`` empty, an embedded Hive metastore will be
created in the directory the connector is started. You need to start Hive in that specific
directory to query the data.
Configuration
-------------
This section gives example configurations that cover common scenarios. For a complete description of the available
configuration options, see :ref:`connect_dataproc_sink_config_options`.
----------------------
Format and Partitioner
----------------------
You need to specify the ``format.class`` and ``partitioner.class`` if you want to write other
formats to HDFS or use other partitioners. The following example configurations demonstrates how to
write Parquet format and use hourly partitioner:
.. codewithvars:: bash
format.class=io.confluent.connect.gcp.dataproc.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.gcp.dataproc.hdfs.partitioner.HourlyPartitioner
.. note:: If you want to use the field partitioner, you need to specify the ``partition.field.name``
configuration as well to specify the field name of the record.
----------------
Hive Integration
----------------
At minimum, you need to specify ``hive.integration``, ``hive.metastore.uris`` and
``schema.compatibility`` when integrating Hive. Here is an example configuration:
.. codewithvars:: bash
hive.integration=true
hive.metastore.uris=thrift://:9083
schema.compatibility=BACKWARD
You should adjust the ``hive.metastore.uris`` according to your Hive configurations.
.. note:: If you don't specify the ``hive.metastore.uris``, the connector will use a local metastore
with Derby in the directory running the connector. You need to run Hive in this directory
in order to see the Hive metadata change.
Also, to support schema evolution, set the ``schema.compatibility`` to be ``BACKWARD``, ``FORWARD``, or
``FULL``. This ensures that Hive can query the data written to HDFS with different schemas using the
latest Hive table schema. For more information on schema compatibility, see :ref:`dataproc-hdfs-schema-evolution`.
.. _dataproc-secure-hdfs-hive-metastore:
------------------------------
Secure HDFS and Hive Metastore
------------------------------
To work with secure HDFS and Hive metastore, you need to specify ``hdfs.authentication.kerberos``,
``connect.hdfs.principal``, ``connect.keytab``, ``hdfs.namenode.principal``:
.. codewithvars:: bash
hdfs.authentication.kerberos=true
connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
connect.hdfs.keytab=path to the connector keytab
hdfs.namenode.principal=namenode principal
You need to create the |ak| connect principals and keytab files using Kerberos and distribute the
keytab file to all hosts that running the connector and ensures that only the connector user
has read access to the keytab file.
.. note:: Currently, the connector requires that the principal and the keytab path to be the same
on all the hosts running the connector. The host part of the ``hdfs.namenode.prinicipal`` needs
to be the actual FQDN of the Namenode host instead of the ``_HOST`` placeholder.
.. _dataproc-hdfs-schema-evolution:
Schema Evolution
----------------
The Dataproc connector supports schema evolution and reacts to schema changes of data according to the
``schema.compatibility`` configuration. This section explains how the
connector reacts to schema evolution under different values of ``schema.compatibility``. The
``schema.compatibility`` can be set to ``NONE``, ``BACKWARD``, ``FORWARD``, and ``FULL``, which means
NO compatibility, BACKWARD compatibility, FORWARD compatibility, and FULL compatibility respectively.
* **NO Compatibility**: By default, the ``schema.compatibility`` is set to ``NONE``. In this case,
the connector ensures that each file written to HDFS has the proper schema. When the connector
observes a schema change in data, it commits the current set of files for the affected topic
partitions and writes the data with new schema in new files.
* **BACKWARD Compatibility**: If a schema is evolved in a backward compatible manner, you can always
use the latest schema to query all the data uniformly. For example, removing fields is a backward-compatible
change to a schema, since when the connector encounters records written with the old
schema that contains these fields it can just ignore them. Adding a field with a default value is
also backward compatible.
If ``BACKWARD`` is specified in the ``schema.compatibility``, the connector keeps track
of the latest schema used in writing data to HDFS, and if a data record with a schema version
larger than current latest schema arrives, the connector commits the current set of files
and writes the data record with new schema to new files. For data records arriving at a later time
with a schema of an earlier version, the connector projects the data record to the latest schema
before writing to the same set of files in HDFS.
* **FORWARD Compatibility**: If a schema is evolved in a forward-compatible manner, you can always
use the oldest schema to query all the data uniformly. Removing a field that had a default value
is forward compatible, since the old schema will use the default value when the field is missing.
If ``FORWARD`` is specified in the ``schema.compatibility``, the connector projects the data to
the oldest schema before writing to the same set of files in HDFS.
* **FULL Compatibility**: Full compatibility means that old data can be read with the new schema
and new data can also be read with the old schema.
If ``FULL`` is specified in the ``schema.compatibility``, the connector performs the same action
as ``BACKWARD``.
If Hive integration is enabled, you must specify the ``schema.compatibility`` to be ``BACKWARD``,
``FORWARD``, or ``FULL``. This ensures that the Hive table schema is able to query all the data under
a topic written with different schemas. If the ``schema.compatibility`` is set to ``BACKWARD`` or
``FULL``, the Hive table schema for a topic will be equivalent to the latest schema in the HDFS files
under that topic that can query the whole data of that topic. If the ``schema.compatibility`` is
set to ``FORWARD``, the Hive table schema of a topic is equivalent to the oldest schema of the HFDS
files under that topic that can query the whole data of that topic.
.. toctree::
:maxdepth: 1
:hidden:
configuration_options
changelog