Set up Databricks Delta Lake (AWS)

To support exactly-once semantics, you must create a separate Amazon S3 bucket where data can be staged before delivery into Databricks Delta Lake. The following steps show how to create a new Databricks workspace and add the S3 staging bucket you need. These steps assume you have working knowledge of Databricks and AWS CloudFormation. For detailed information about Databricks Delta Lake, see the Databricks documentation.

Note

The following steps show Databricks Delta Lake and AWS CloudFormation UI screen captures. If you notice any issues with the screen captures, please report the outdated screen captures to the Confluent docs team.

Step 1: Create the Databricks workspace

Important

Be sure to create Databricks Delta Lake workspace resources in the same region where your Kafka cluster is running.

  1. Create a Databricks workspace using Start Quickstart. You create the workspace resources in AWS CloudFormation.

    Select Quickstart

    Databricks workspace quickstart

  2. On the AWS CloudFormation UI, use the defaults and create the workspace resources. The resources will take a few minutes to build.

    AWS CloudFormation

    AWS CloudFormation

    Once the resources are created, the workspace is available on the Databricks console.

    Databricks Delta Lake Workspace Name

    Workspace

  3. Click Open on the Databricks console.

    Open the Workspace

    Open the workspace

Keep the Databricks console open and go to Amazon Web Services.

Step 2: Create the Amazon S3 bucket and policies

  1. Create an Amazon S3 bucket. This is the staging bucket used by the connector.

  2. Go to the AWS IAM dashboard and create a role with the following policy. Add an inline policy using the following JSON.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:ListBucket"
          ],
         "Resource": [
            "arn:aws:s3:::<bucket-name>"
          ]
        },
        {
          "Effect": "Allow",
          "Action": [
            "s3:PutObject",
            "s3:GetObject",
            "s3:DeleteObject",
            "s3:PutObjectAcl"
          ],
          "Resource": [
             "arn:aws:s3:::<bucket-name>/*"
          ]
        }
      ]
    }
    
  3. Create a bucket policy. Add an inline policy using the following JSON.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "Example permissions",
          "Effect": "Allow",
          "Principal": {
            "AWS": "arn:aws:iam::<aws-account-id-databricks>:role/<iam-role-for-s3-access>"
          },
          "Action": [
            "s3:GetBucketLocation",
            "s3:ListBucket"
          ],
          "Resource": "arn:aws:s3:::<bucket-name>"
        },
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": "arn:aws:iam::<aws-account-id-databricks>:role/<iam-role-for-s3-access>"
          },
          "Action": [
            "s3:PutObject",
            "s3:GetObject",
            "s3:DeleteObject",
            "s3:PutObjectAcl"
          ],
          "Resource": "arn:aws:s3:::<bucket-name>/*"
        }
      ]
    }
    

See the Databricks AWS documentation for additional details.

Step 3: Update the workspace configuration role policy

  1. Locate the IAM role used to create the Databricks deployment. To locate the role, click on the Databricks workspace name.

    Databricks Delta Lake Workspace Name

    Workspace name

  2. Get the Role ARN to use in the next step.

    Databricks Delta Lake Workspace Name

    Role ARN

  3. Go to the AWS IAM dashboard and update the policy for the Role ARN identified in the previous step. Use the following JSON.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "Stmt1403287045000",
          "Effect": "Allow",
          "Action": [
            "ec2:AssociateDhcpOptions",
            "ec2:AssociateIamInstanceProfile",
            "ec2:AssociateRouteTable",
            "ec2:AttachInternetGateway",
            "ec2:AttachVolume",
            "ec2:AuthorizeSecurityGroupEgress",
            "ec2:AuthorizeSecurityGroupIngress",
            "ec2:CancelSpotInstanceRequests",
            "ec2:CreateDhcpOptions",
            "ec2:CreateInternetGateway",
            "ec2:CreateKeyPair",
            "ec2:CreateRoute",
            "ec2:CreateSecurityGroup",
            "ec2:CreateSubnet",
            "ec2:CreateTags",
            "ec2:CreateVolume",
            "ec2:CreateVpc",
            "ec2:CreateVpcPeeringConnection",
            "ec2:DeleteInternetGateway",
            "ec2:DeleteKeyPair",
            "ec2:DeleteRoute",
            "ec2:DeleteRouteTable",
            "ec2:DeleteSecurityGroup",
            "ec2:DeleteSubnet",
            "ec2:DeleteTags",
            "ec2:DeleteVolume",
            "ec2:DeleteVpc",
            "ec2:DescribeAvailabilityZones",
            "ec2:DescribeIamInstanceProfileAssociations",
            "ec2:DescribeInstanceStatus",
            "ec2:DescribeInstances",
            "ec2:DescribePrefixLists",
            "ec2:DescribeReservedInstancesOfferings",
            "ec2:DescribeRouteTables",
            "ec2:DescribeSecurityGroups",
            "ec2:DescribeSpotInstanceRequests",
            "ec2:DescribeSpotPriceHistory",
            "ec2:DescribeSubnets",
            "ec2:DescribeVolumes",
            "ec2:DescribeVpcs",
            "ec2:DetachInternetGateway",
            "ec2:DisassociateIamInstanceProfile",
            "ec2:ModifyVpcAttribute",
            "ec2:ReplaceIamInstanceProfileAssociation",
            "ec2:RequestSpotInstances",
            "ec2:RevokeSecurityGroupEgress",
            "ec2:RevokeSecurityGroupIngress",
            "ec2:RunInstances",
            "ec2:TerminateInstances"
          ],
          "Resource": [
            "*"
          ]
        },
        {
          "Effect": "Allow",
          "Action": "iam:PassRole",
          "Resource": "arn:aws:iam::<aws-account-id-databricks>:role/<iam-role-for-s3-access>"
        },
        {
          "Effect": "Allow",
          "Action": [
            "iam:CreateServiceLinkedRole",
            "iam:PutRolePolicy"
          ],
          "Resource": "arn:aws:iam::*:role/aws-service-role/spot.amazonaws.com/AWSServiceRoleForEC2Spot",
          "Condition": {
            "StringLike": {
              "iam:AWSServiceName": "spot.amazonaws.com"
            }
          }
        }
      ]
    }
    

Step 4: Add the instance profile and create a cluster

  1. Locate the instance profile ARN for the S3 bucket role created in a previous step.

    Databricks Delta Lake Instance Profile ARN

    Instance profile ARN

  2. Go to the Databricks Admin Console and add an instance profile.

    Databricks Delta Lake Admin Console

    Admin Console

    Databricks Delta Lake Add Instance Profile

    Add Instance Profile

  3. Create a cluster in Databricks using the Instance Profile created in the previous step.

    Databricks Delta Lake Create Cluster

    Create Cluster

Step 5: Create a new user on AWS

Create a new AWS user (for the connector) and add the following S3 bucket policy. This policy is the same as the one used for the Amazon S3 sink connector with the addition of "s3:DeleteObject". The connector requires DeleteObject so it can delete files in the staging S3 bucket once they are copied to the Delta Lake table.

{
   "Version":"2012-10-17",
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "s3:ListAllMyBuckets"
         ],
         "Resource":"arn:aws:s3:::*"
      },
      {
         "Effect":"Allow",
         "Action":[
            "s3:ListBucket",
            "s3:GetBucketLocation"
         ],
         "Resource":"arn:aws:s3:::<bucket-name>"
      },
      {
         "Effect":"Allow",
         "Action":[
            "s3:PutObject",
            "s3:GetObject",
            "s3:AbortMultipartUpload",
            "s3:ListMultipartUpload",
            "s3:ListMultipartUploadParts",
            "s3:ListBucketMultipartUploads",
            "s3:DeleteObject"

         ],
         "Resource":"arn:aws:s3:::<bucket-name>/*"
      }
   ]
}

Step 6: Create a table

Create a table using a Databricks Notebook. Use the following example to create the table. Note that the table entries uses the Datagen Source Connector for Confluent Cloud with a topic named pageviews

Example pageviews Kafka topic and example schema from the Datagen Source.

{
  "connect.name": "ksql.pageviews",
  "fields": [
    {
      "name": "viewtime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "pageid",
      "type": "string"
    }
  ],
  "name": "pageviews",
  "namespace": "ksql",
  "type": "record"
}

Step 7: Gather connector configuration information

Important

You must have the following information to complete the connector configuration using the Confluent Cloud Console or CLI.

  • Databricks server hostname: For example, dbc-12345df33-0e53.cloud.databricks.com. The hostname is available on the Databricks Create Cluster page.

  • HTTP path: For example, sql/protocolv1/o/123456789101112/1004-123456-voice40. The path available on the Databricks Create Cluster page.

    Databricks Delta Lake Hostname and HTTP Path

    Hostname and HTTP path

  • Delta Lake token: You can find the token or generate a new access token. Go to Settings > User Settings > Access Tokens.

    Databricks Delta Lake Access Token

    Access Token

  • Delta Lake table name: pageviews is used for the steps in this example.

  • User AWS access key ID and secret access key. For details, see the AWS documentation.

  • Staging S3 bucket name: For example, confluent-databricks.