Databricks Delta Lake Sink Connector Confluent Platform

The Databricks Delta Lake Sink connector for Confluent Platform periodically polls data from Apache Kafka® and copies the data into an Amazon S3 staging bucket, and then commits these records to a Databricks Delta Lake instance.

It is important to note the following:

  • If you have one or more access policies for S3 storage that includes a condition for network address translation (NAT) IPs, you must update your policy to also include Databricks’ VPC IDs for these underlying S3 gateway endpoints. For help with making this change and for an example of the S3 policy, see the Databricks Community page.
  • You must have a Databricks Delta Lake instance on AWS and an S3 bucket ready.
  • The connector appends data only.
  • Data is staged in an Amazon S3 bucket. If you delete any files in S3 bucket, you will lose exactly-once semantics (EOS).
  • The connector adds a field named partition. Your Delta Lake table must include a field named partition using type INT (partition INT).

Features

The Databricks Delta Lake Sink connector provides the following features:

Supports multiple tasks

Versions 1.0.5 and later of the Databricks Delta Lake Sink connector support running multiple tasks. Versions earlier than 1.0.5 support running only one task.

Multiple data formats

The connector supports input data from Kafka topics in Avro, JSON Schema, and Protobuf formats. You must enable Schema Registry to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf).

Topic automation

If you do not provide a table name, the connector can create a topic using the originating Kafka topic name (that is–the configuration property defaults to ${topic}).

Limitations

The Databricks Delta Lake Sink connector for Confluent Platform has the following limitations:

  • Data is staged in an Amazon S3 bucket.
  • The Databricks Delta Lake Sink connector does not currently support exactly-once semantics.
  • You can’t configure multiple connectors that consume from the same topic and use the same Amazon S3 staging bucket.
  • Multi-task support is unavailable for Databricks Delta Lake versions earlier than 1.0.5. You can configure a single task only when using versions earlier than 1.0.5.
  • The connector appends data only.
  • The connector uses the UTC timezone.
  • The connector does not currently support any Single Message Transformations (SMTs) that modify the topic name. Specifically, the following transformations are not allowed:
    • org.apache.kafka.connect.transforms.TimestampRouter
    • io.confluent.connect.transforms.MessageTimestampRouter
    • io.confluent.connect.transforms.ExtractTopic$Key
    • io.confluent.connect.transforms.ExtractTopic$Value
  • The connector does not currently support complex data types: Array, Map, Union, and Struct. Confluent recommends using a supported data type: Int, Long, Float, Double, Boolean, String, or Byte.

Install the Databricks Delta Lake Sink connector for Confluent Platform

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • All Databricks Delta Lake and AWS CloudFormation procedures completed. For more details, see Set up Databricks Delta Lake (AWS) Sink Connector for Confluent Platform.

  • The Confluent CLI installed and configured for the cluster. For more help, see Install the Confluent CLI.

  • You must enable Schema Registry to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • An AWS IAM Policy for S3 configured for bucket access.

  • An AWS account configured with Access Keys. You use these access keys when setting up the connector.

  • Download a JDBC driver.

  • An install of the Confluent Hub Client. This is installed by default with Confluent Enterprise.

  • An install of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-databricks-delta-lake:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-databricks-delta-lake:1.0.7
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

Configuration properties

For a complete list of configuration properties for this connector, see Configuration Reference for Databricks Delta Lake Sink for Confluent Platform.

Downloading a JDBC Driver

The Databricks Delta Lake Sink connector uses the Java Database Connectivity (JDBC) API which enables applications to connect to and use a wide range of database systems. In order for this to work, the connector must have a JDBC driver for Databricks Delta Lake.

Before you use the connector with Databricks Delta Lake, you must install the most recent JDBC driver for it. The basic steps are:

  1. Download the JDBC driver JAR (SimbaSparkJDBC42-2.6.22.1040.zip) from https://databricks.com/spark/jdbc-drivers-archive.

  2. Place the JAR file into the /lib directory as shown in the following example:

    <path-to-the-connector>/confluentinc-kafka-connect-databricks-delta-lake/lib/SparkJDBC42.jar
    
  3. Restart all the Connect worker nodes.

Quick start

Use this quick start to get up and running with the Databricks Delta Lake Sink connector for Confluent Platform. The quick start provides the basics of selecting the connector and configuring it to stream data.

Before moving forward, ensure you:

Configure the connector using the Confluent CLI

To configure the connector using the Confluent CLI, complete the following steps. Note that the example commands in this quick start use the latest version of the Confluent CLI.

Step 1: Start Confluent Platform

  1. If not yet running, start Confluent Platform.

    confluent local services start
    

Step 2: Create the connector configuration file.

  1. Create the following connector configuration JSON file and save the file as databricks.json. The example assumes the pageviews topic is created by the Datagen Source connector with the PAGEVIEWS example–AvroConvter for Kafka record value and StringConvter for Kafka record key. The pageviews topic will be mapped to the pageviews table on Databricks Delta Lake.

    {
      "name": "DatabricksDeltaLakeSink",
      "config":{
        "topics": "pageviews",
        "connector.class": "io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkConnector",
        "name": "DatabricksDeltaLakeSink",
        "s3.region": "<region>",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "delta.lake.host.name": "<delta-lake-host-name e.g. dbc-e12345cd-e12345ed.cloud.databricks.com>",
        "delta.lake.http.path": "<delta-lake-host-name e.g. sql/protocolv1/o/1234567891811460/0000-01234-str6jlpz>",
        "delta.lake.token": "************************************",
        "delta.lake.topic2table.map": "pageviews:pageviews",
        "delta.lake.table.auto.create": "true",
        "staging.s3.access.key.id": "********************",
        "staging.s3.secret.access.key": "****************************************",
        "staging.bucket.name": "databricks",
        "flush.interval.ms": "300000",
        "tasks.max": "1"
       }
    }
    

Note the following required property definitions:

  • name: Sets a name for your new connector.
  • connector.class: Identifies the connector plugin name.
  • topics: Enter the topic name or a comma-separated list of topic names.
  • s3.region: Sets the AWS region to be used by the connector.
  • input.data.format: Sets the input Kafka record value format (data coming from the Kafka topic). Valid formats are Avro, JSON_SR (JSON Schema), or Protobuf.
  • delta.lake....: See the Databricks Delta Lake setup procedure for where you can get this information. See Configuration Reference for Databricks Delta Lake Sink for Confluent Platform for additional property values and descriptions.
  • staging....: These properties use the information you get from Databricks and AWS. See the Databricks Delta Lake setup procedure for more details.
  • flush.interval.ms: The time interval in milliseconds (ms) to periodically invoke file commits. This property ensures the connector invokes file commits at every configured interval. The commit time is adjusted to 00:00 UTC. The commit is performed at the scheduled time, regardless of he last commit time or number of messages. This configuration is useful when you have to commit your data based on current server time, like at the beginning of each hour. The default value used is 300,000 ms (5 minutes).
  • tasks.max: Enter the maximum number of tasks for the connector to use. The connector supports running one task per connector instance.

Single Message Transforms: For details about adding SMTs using the CLI, see the Single Message Transformations (SMTs) documentation. For configuration property values and descriptions, see Configuration Reference for Databricks Delta Lake Sink for Confluent Platform.

Step 3: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent local services connect connector load DatabricksDeltaLakeSink -c databricks.json

If you prefer using Connect API, enter the following command:

curl -s -H "Content-Type: application/json" -X POST -d @databricks.json http://localhost:8083/connectors/

Step 4: Check the connector status.

Enter the following command to check the connector status:

confluent local services connect connector status DatabricksDeltaLakeSink

If you prefer using Connect API, enter the following command:

curl -X GET http://localhost:8083/connectors/DatabricksDeltaLakeSink/status -H "Content-Type: application/json"

Step 5: Check the S3 bucket.

Check that records are populating the staging Amazon S3 bucket and then populating the Databricks Delta Lake table.