Google Cloud Dataproc Sink Connector for Confluent Cloud

Note

If you are installing the connector locally for Confluent Platform, see Google Cloud Dataproc Sink Connector for Confluent Platform.

The Kafka Connect Google Cloud Dataproc Sink Connector integrates Apache Kafka® with managed HDFS instances in Google Cloud Dataproc. The connector periodically polls data from Kafka and writes this data to HDFS.

The Kafka Connect Google Cloud Dataproc sink connector integrates with Hive. When it is enabled, the connector automatically creates an external Hive partitioned table for each Kafka topic and updates the table according to the available data in HDFS.

Important

Once this connector moves from Preview to Generally Availability (GA), it will require a subscription for Confluent Cloud commitment for Confluent Cloud Enterprise customers. Without a Confluent Cloud commitment, Confluent Cloud Enterprise customers will not have access to these connectors in GA. Contact your Confluent Account Executive to learn more and to update your subscription, if necessary.

Features

The Confluent Cloud Google Cloud Dataproc sink connector provides the following features:

  • Exactly Once Delivery: The connector uses a write ahead log to ensure each record exports to HDFS exactly once. Also, the connector manages the offsets commit by encoding the Kafka offset information into the file so that the connector can start from the last committed offsets in case of failures and task restarts.
  • Extensible Data Format: The connector supports writing data to HDFS in Avro, JSON, and String format.
  • Hive Integration: The connector supports Hive integration. When it is enabled, the connector automatically creates a Hive external partitioned table for each topic exported to HDFS.
  • Time-Based Partitioner: The connector supports a daily and hourly partitioner.
  • Seamless Dataproc Integration: The only connection requirements are the Google Cloud Platform credentials and the Dataproc cluster name and project. No need to get the HDFS URL or adjust a Hadoop configuration.
  • High Availability (HA) Cluster Support: No additional configuration is required to connect to a multi-master HA cluster.

Caution

Preview connectors are not currently supported and are not recommended for production use. For specific connector limitations, see Cloud connector limitations.

Refer to Cloud connector limitations for additional information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Google Cloud Dataproc sink connector. The quick start provides the basics of selecting the connector and configuring it to stream events to HDFS.

Prerequisites

Using the Confluent Cloud GUI

Complete the following steps to set up and run the connector using the Confluent Cloud GUI.

Step 1: Launch your Confluent Cloud cluster.

See the Confluent Cloud Quick Start for installation instructions.

Step 2: Add a connector.

Click Connectors > Add connector.

Add a connector

Step 3: Select your connector.

Click the Google Dataproc Sink connector icon.

Step 4: Enter the cluster details.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk (*) designates a required entry.

Complete the following and click Continue.

  1. Select the topic or topics containing the data you want to sink.
  2. Enter a Connector Name.
  3. Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.
  4. Under Input Messages, select the message format used in the topic.

Step 5: Enter the destination details.

Important

Your GCP credentials are validated here. Make sure you enter these correctly.

  1. Copy and paste the contents of the downloaded GCP service account credentials JSON file into the GCP credentials field.
  2. Enter your Dataproc project ID and cluster name.
  3. Under HDFS details, enter the topics directory (optional) and the top-level HDFS directory where write-ahead logs are stored.
  4. Select the output format for messages delivered to Dataproc.
  5. Under Hive, select whether you want to use Hive intregration and then add the details for your Hive configuration.
  6. Under Organize my data, enter the followiing properties:
    • Time interval: Select the time-based partitioning interval you want to use.
    • Flush size. A typical setting for this property is 1000. For example, if you use the value 1000 and your topic has six partitions, files start to be created in Dataproc after more than 1000 records exist in each partition. Note that the default value of 1000 can be increased if needed.
  7. Enter the maximum number of tasks the connector can use. Refer to Confluent Cloud connector limitations for additional information.

Configuration properties that are not shown in the Confluent Cloud UI use the default values. For default values and property definitions, see Google Cloud Dataproc Sink Connector Configuration Properties.

Step 6: Launch the connector.

Verify the following and click Launch.

Launch the connector

Step 7: Check the connector status.

The status for the connector should go from Provisioning to Running.

Step 8: Check the Dataproc cluster.

Go to your Dataproc cluster and make sure the topic is being populated with records.

For additional information about this connector, see Google Cloud Dataproc Sink Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud KSQL, see the Cloud ETL demo. The demo also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe DataprocSink

Example output:

Following are the required configs:
connector.class: DataprocSink
name
kafka.api.key
kafka.api.secret
topics
input.data.format
gcp.dataproc.credentials.json
gcp.dataproc.projectId
gcp.dataproc.cluster
gcp.dataproc.namenode
logs.dir
output.data.format
time.interval
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows required and optional connector properties.

{
  "connector.class": "DataprocSink",
  "name": "dataproc-test",
  "kafka.api.key": "<my-kafka-api-key>",
  "kafka.api.secret": "<my-kafka-api-secret>",
  "topics": "<topic-name>",
  "input.data.format": "AVRO",
  "gcp.dataproc.credentials.json": "<credentials-json-file-contents>",
  "gcp.dataproc.projectId": "<my-dataproc-project-ID",
  "gcp.dataproc.cluster": "<my-dataproc-cluster-name>",
  "gcp.dataproc.namenode": "<IP-address-of-the-namenode>",
  "logs.dir": "<HDFS-logs-directory>",
  "output.data.format": "AVRO",
  "flush.size": "1000",
  "time.interval": "HOURLY",
  "tasks.max": "1"
}

Note the following property definitions:

  • "name": Sets a name for your new connector.
  • "connector.class": Identifies the connector plugin name.
  • "topics": Identifies the topic name or a comma-separated list of topic names.
  • "data.format": Sets the message format. Valid entries are AVRO, JSON, or BYTES.
  • "gcp.dataproc.credentials.json": This contains the contents of the downloaded JSON file. See Formatting GCP credentials for details about how to format and use the contents of the downloaded credentials file.
  • "gcp.dataproc.namenode": For VPC-peered environments, this is the internal IP address or URL of the HDFS namenode. For non-VPC-peered environments, this should be either a public IP address or publicly accessible URL.
  • "logs.dir": This is the the top-level HDFS directory where write-ahead logs are stored.
  • "flush.size": (Optional) If not entered, this property value defaults to 1000. For example, if you use the value 1000 and your topic has six partitions, files start to be created in Dataproc after more than 1000 records exist in each partition. Note that the default value of 1000 can be increased if needed.
  • "time.interval": Sets how your messages are grouped. Valid entries are DAILY or HOURLY.

Configuration properties that are not listed use the default values. For default values and property definitions, see Google Cloud Dataproc Sink Connector Configuration Properties.

Formatting GCP credentials

The contents of the downloaded credentials file must be converted to string format before it can be used in the connector configuration.

  1. Convert the JSON file contents into string format. You can use an online converter tool to do this. For example: JSON to String Online Converter.

  2. Add the escape character \ before all \n entries in the Private Key section so that each section begins with \\n (see the highlighted lines below). The example below has been formatted so that the \\n entries are easier to see. Most of the credentials key has been omitted.

    Tip

    A script is available that converts the credentials to a string and also adds additional \ escape characters where needed. See Stringify GCP Credentials.

      {
          "connector.class": "DataprocSink",
          "name": "dataproc-sink",
          "kafka.api.key": "<my-kafka-api-key>",
          "kafka.api.secret": "<my-kafka-api-secret>",
          "topics": "<topic-name>",
          "data.format": "AVRO",
          "gcp.dataproc.credentials.json" : "{\"type\":\"service_account\",\"project_id\":\"connect-
          1234567\",\"private_key_id\":\"omitted\",
          \"private_key\":\"-----BEGIN PRIVATE KEY-----
          \\nMIIEvAIBADANBgkqhkiG9w0BA
          \\n6MhBA9TIXB4dPiYYNOYwbfy0Lki8zGn7T6wovGS5pzsIh
          \\nOAQ8oRolFp\rdwc2cC5wyZ2+E+bhwn
          \\nPdCTW+oZoodY\\nOGB18cCKn5mJRzpiYsb5eGv2fN\/J
          \\n...rest of key omitted...
          \\n-----END PRIVATE KEY-----\\n\",
          \"client_email\":\"pub-sub@connect-123456789.iam.gserviceaccount.com\",
          \"client_id\":\"123456789\",\"auth_uri\":\"https:\/\/accounts.google.com\/o\/oauth2\/
          auth\",\"token_uri\":\"https:\/\/oauth2.googleapis.com\/
          token\",\"auth_provider_x509_cert_url\":\"https:\/\/
          www.googleapis.com\/oauth2\/v1\/
          certs\",\"client_x509_cert_url\":\"https:\/\/www.googleapis.com\/
          robot\/v1\/metadata\/x509\/pub-sub%40connect-
          123456789.iam.gserviceaccount.com\"}",
          "gcp.dataproc.projectId": "<my-dataproc-project-ID",
          "gcp.dataproc.region": "<gcp-region>",
          "gcp.dataproc.cluster": "<my-dataproc-cluster-name>",
          "logs.dir": "<HDFS-logs-directory>",
          "flush.size": "1000",
          "time.interval": "HOURLY",
          "tasks.max": "1"
      }
    
  3. Add all the converted string content to the "gcp.dataproc.credentials.json" section of your configuration file as shown in the example above.

Step 4: Load the configuration file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config dataproc-sink-config.json

Example output:

Created connector dataproc-sink jtt-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID          |       Name      | Status  | Type
+-----------+-----------------+---------+------+
jtt-ix4dl   | dataproc-sink   | RUNNING | sink

Step 6: Check the Dataproc cluster.

Go to your Dataproc cluster and make sure the topic is being populated with records.

For additional information about this connector, see Google Cloud Dataproc Sink Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

Next Steps

Try out a Confluent Cloud demo.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud KSQL, see the Cloud ETL demo. The demo also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.