Databricks Delta Lake Sink Connector Confluent Platform

Note

If you are using Confluent Cloud, see Databricks Delta Lake Sink Connector for Confluent Cloud for the cloud Quick Start.

The Databricks Delta Lake Sink connector for Confluent Platform periodically polls data from Apache Kafka® and copies the data into an Amazon S3 staging bucket, and then commits these records to a Databricks Delta Lake instance.

Note

  • You must have a Databricks Delta Lake instance on AWS and an S3 bucket ready.
  • The connector appends data only.
  • Data is staged in an Amazon S3 bucket. If you delete any files in S3 bucket bucket, you will lose exactly-once semantics (EOS).
  • The connector adds a field named partition. Your Delta Lake table must include a field named partition using type INT (partition INT).

Features

The Databricks Delta Lake Sink connector provides the following features:

Supports one task

The Databricks Delta Lake Sink connector for Confluent Platform supports running only one task.

Multiple data formats

The connector supports input data from Kafka topics in Avro, JSON Schema, and Protobuf formats. You must enable Schema Registry to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf).

Topic automation

If you do not provide a table name, the connector can create a topic using the originating Kafka topic name (that is–the configuration property defaults to ${topic}).

Limitations

The Databricks Delta Lake Sink Connector for Confluent Platform has the following limitations:

  • Data is staged in an Amazon S3 bucket.
  • Databricks Delta Lake Sink connector does not currently support exactly-once semantics.
  • You can’t configure multiple connectors that consume from the same topic and use the same Amazon S3 staging bucket.
  • Multi-task support is unavailable for Databricks Delta Lake versions 10.5 and earlier. You can configure a single task when using versions 10.5 and earlier.
  • The connector appends data only.
  • The connector uses the UTC timezone.
  • The connector supports running one task per connector instance.
  • The connector does not currently support any Single Message Transformations (SMTs) that modify the topic name. Specifically, the following transformations are not allowed:
    • org.apache.kafka.connect.transforms.TimestampRouter
    • io.confluent.connect.transforms.MessageTimestampRouter
    • io.confluent.connect.transforms.ExtractTopic$Key
    • io.confluent.connect.transforms.ExtractTopic$Value

Prerequisites

  • All Databricks Delta Lake and AWS CloudFormation procedures completed. See Set up Databricks Delta Lake (AWS).
  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
  • You must enable Schema Registry to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • An AWS IAM Policy for S3 configured for bucket access.
  • An AWS account configured with Access Keys. You use these access keys when setting up the connector.
  • Download a JDBC driver.

Install the Databrick Delta Lake Sink Connector for Confluent Platform

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Note

You must install the connector on every machine where Connect will run.

  • An install of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An install of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-databricks-delta-lake:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-databricks-delta-lake:1.0.1
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

Configuration Properties

For a complete list of configuration properties for this connector, see Databricks Delta Lake Sink Configuration Properties.

Downloading a JDBC Driver

The Databricks Delta Lake Sink connector uses the Java Database Connectivity (JDBC) API which enables applications to connect to and use a wide range of database systems. In order for this to work, the connector must have a JDBC driver for Databricks Delta Lake.

Before you use the connector with Databricks Delta Lake, you must install the most recent JDBC driver for it. The basic steps are:

  1. Download the JDBC driver JAR (SimbaSparkJDBC42-2.6.22.1040.zip) from https://databricks.com/spark/jdbc-drivers-archive.
  2. Place the JAR file into the directory where your connector is installed or the directory specified in plugin.path in your Confluent Platform installation on each of the Connect worker nodes.
  3. Restart all the Connect worker nodes.

Quick Start

Important

Be sure to review and complete the tasks in Set up Databricks Delta Lake (AWS) before configuring the connector.

Use this quick start to get up and running with the Databricks Delta Lake Sink connector for Confluent Platform. The quick start provides the basics of selecting the connector and configuring it to stream data.

Configure the connector using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

  • Make sure you have all your prerequisites completed.
  • The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.

Step 1: Start Confluent Platform

  1. If not yet running, start Confluent Platform.

    confluent local services start
    

Step 2: Create the connector configuration file.

  1. Create the following connector configuration JSON file and save the file as databricks.json. The example assumes the pageviews topic is created by the Datagen Source connector with the PAGEVIEWS example–AvroConvter for Kafka record value and StringConvter for Kafka record key. The pageviews topic will be mapped to the pageviews table on Databricks Delta Lake.

    {
      "name": "DatabricksDeltaLakeSink",
      "config":{
        "topics": "pageviews",
        "connector.class": "io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkConnector",
        "name": "DatabricksDeltaLakeSink",
        "s3.region": "<region>",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "delta.lake.host.name": "<delta-lake-host-name e.g. dbc-e12345cd-e12345ed.cloud.databricks.com>",
        "delta.lake.http.path": "<delta-lake-host-name e.g. sql/protocolv1/o/1234567891811460/0000-01234-str6jlpz>",
        "delta.lake.token": "************************************",
        "delta.lake.topic2table.map": "pageviews:pageviews",
        "delta.lake.table.auto.create": "true",
        "staging.s3.access.key.id": "********************",
        "staging.s3.secret.access.key": "****************************************",
        "staging.bucket.name": "databricks",
        "flush.interval.ms": "300000",
        "tasks.max": "1"
       }
    }
    

Note the following required property definitions:

  • name: Sets a name for your new connector.
  • connector.class: Identifies the connector plugin name.
  • topics: Enter the topic name or a comma-separated list of topic names.
  • s3.region: Sets the AWS region to be used by the connector.
  • input.data.format: Sets the input Kafka record value format (data coming from the Kafka topic). Valid formats are Avro, JSON_SR (JSON Schema), or Protobuf.
  • delta.lake....: See the Databricks Delta Lake setup procedure for where you can get this information. See Databricks Delta Lake Sink Configuration Properties for additional property values and descriptions.
  • staging....: These properties use the information you get from Databricks and AWS. See the Databricks Delta Lake setup procedure for more details.
  • flush.interval.ms: The time interval in milliseconds (ms) to periodically invoke file commits. This property ensures the connector invokes file commits at every configured interval. The commit time is adjusted to 00:00 UTC. The commit is performed at the scheduled time, regardless of he last commit time or number of messages. This configuration is useful when you have to commit your data based on current server time, like at the beginning of each hour. The default value used is 300,000 ms (5 minutes).
  • tasks.max: Enter the maximum number of tasks for the connector to use. The connector supports running one task per connector instance.

Single Message Transforms: See the Single Message Transformations (SMTs) documentation for details about adding SMTs using the CLI. See Databricks Delta Lake Sink Configuration Properties for configuration property values and descriptions.

Step 3: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent local services connect connector load DatabricksDeltaLakeSink -c databricks.json

If you prefer using Connect API, enter the following command:

curl -s -H "Content-Type: application/json" -X POST -d @databricks.json http://localhost:8083/connectors/

Step 4: Check the connector status.

Enter the following command to check the connector status:

confluent local services connect connector status DatabricksDeltaLakeSink

If you prefer using Connect API, enter the following command:

curl -X GET http://localhost:8083/connectors/DatabricksDeltaLakeSink/status -H "Content-Type: application/json"

Step 5: Check the S3 bucket.

Check that records are populating the staging Amazon S3 bucket and then populating the Databricks Delta Lake table.