Databricks Delta Lake(AWS)のセットアップ¶
"厳密に 1 回" のセマンティクスをサポートするには、Databricks Delta Lake へのデリバリー前にデータをステージングできる個別の Amazon S3 バケットを作成する必要があります。以下は、新しい Databricks ワークスペースを作成し、必要な S3 ステージングバケットを追加する手順を示しています。これらの手順は、Databricks および AWS CloudFormation の操作について知識があることを前提としています。Databricks Delta Lake の詳細については、Databricks のドキュメント を参照してください。
注釈
The following steps are derived from information provided in the Databricks documentation.
ステップ 1: Databricks ワークスペースの作成¶
次の手順に従ってワークスペースを作成します。
重要
Databricks Delta Lake ワークスペースのリソースは、必ず Kafka クラスターを実行しているリージョンと同じリージョンに作成してください。
I have data in S3... チェックボックスをオンにして Start Quickstart をクリックします。
Databricks ワークスペースのクイックスタート¶
Databricks によって AWS CloudFormation が起動され、情報があらかじめ入力されたテンプレートが表示されます。
Complete the Quick create stack web form. Databricks pre-populates most of the required fields. Be sure to enter a Data bucket name. You will use this name when creating the S3 bucket in AWS.
ちなみに
You do not need to use the pre-populated IAM and S3 root bucket names. However, using these names ensures that no duplicate resources are created.
AWS CloudFormation のウェブフォーム¶
リソースが作成されたら、Databricks コンソールでワークスペースを利用できます。
ワークスペース¶
ちなみに
The Databricks dialog box does not close automatically. You may need to press Cancel to close it and refresh your browser.
Databricks コンソールで Open をクリックします。
ワークスペースのオープン¶
Databricks コンソールを開いたままで、アマゾンウェブサービス に移動します。
Step 2: Create the S3 staging bucket and policies¶
Complete the following steps to create the S3 staging bucket, verify the IAM role in AWS, and create the bucket policy.
Create the S3 staging bucket using the Data bucket name you specified in the previous step. Use the default settings when creating the S3 bucket.
Find the data bucket access IAM role in the AWS IAM dashboard. Look for the role with the naming convention
<workspace-name>-access-data-buckets
.S3 bucket IAM role¶
IAM ロールのポリシーを確認します。次の例のようなものです。
{ "Statement": [ { "Action": [ "s3:ListBucket", "s3:PutObject", "s3:GetObject", "s3:DeleteObject", "s3:PutObjectAcl" ], "Resource": [ "arn:aws:s3:::<data-bucket-name>/*", "arn:aws:s3:::<data-bucket-name>" ], "Effect": "Allow" } ] }
対象のデータバケットのバケットポリシーを作成します。以下の JSON を使用して インラインポリシー を追加します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "Example permissions", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<aws-account-id-databricks>:role/<iam-role-for-data-bucket-access>" }, "Action": [ "s3:GetBucketLocation", "s3:ListBucket" ], "Resource": "arn:aws:s3:::<data-bucket-name>" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<aws-account-id-databricks>:role/<iam-role-for-data-bucket-access>" }, "Action": [ "s3:PutObject", "s3:GetObject", "s3:DeleteObject", "s3:PutObjectAcl" ], "Resource": "arn:aws:s3:::<data-bucket-name>/*" } ] }
詳細については、Databricks の AWS に関するドキュメント を参照してください。
Step 3: Verify the workspace configuration role policy¶
Databricks デプロイの作成に使用される IAM ロールを見つけます。ロールを探すには、Databricks ワークスペース名をクリックします。
ワークスペース名¶
Role ARN に、次の手順で確認するロールの ARN があります。
ロールの ARN¶
AWS IAM ダッシュボード に移動し、ポリシーが次の例のようなものであることを確認します。
{ "Version": "2012-10-17", "Statement": [ { "Action": [ "ec2:AllocateAddress", "ec2:AssociateDhcpOptions", "ec2:AssociateIamInstanceProfile", "ec2:AssociateRouteTable", "ec2:AttachInternetGateway", "ec2:AttachVolume", "ec2:AuthorizeSecurityGroupEgress", "ec2:AuthorizeSecurityGroupIngress", "ec2:CancelSpotInstanceRequests", "ec2:CreateDhcpOptions", "ec2:CreateInternetGateway", "ec2:CreateKeyPair", "ec2:CreateNatGateway", "ec2:CreatePlacementGroup", "ec2:CreateRoute", "ec2:CreateRouteTable", "ec2:CreateSecurityGroup", "ec2:CreateSubnet", "ec2:CreateTags", "ec2:CreateVolume", "ec2:CreateVpc", "ec2:CreateVpcEndpoint", "ec2:DeleteDhcpOptions", "ec2:DeleteInternetGateway", "ec2:DeleteKeyPair", "ec2:DeleteNatGateway", "ec2:DeletePlacementGroup", "ec2:DeleteRoute", "ec2:DeleteRouteTable", "ec2:DeleteSecurityGroup", "ec2:DeleteSubnet", "ec2:DeleteTags", "ec2:DeleteVolume", "ec2:DeleteVpc", "ec2:DeleteVpcEndpoints", "ec2:DescribeAvailabilityZones", "ec2:DescribeIamInstanceProfileAssociations", "ec2:DescribeInstanceStatus", "ec2:DescribeInstances", "ec2:DescribeInternetGateways", "ec2:DescribeNatGateways", "ec2:DescribePlacementGroups", "ec2:DescribePrefixLists", "ec2:DescribeReservedInstancesOfferings", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSpotInstanceRequests", "ec2:DescribeSpotPriceHistory", "ec2:DescribeSubnets", "ec2:DescribeVolumes", "ec2:DescribeVpcs", "ec2:DetachInternetGateway", "ec2:DisassociateIamInstanceProfile", "ec2:DisassociateRouteTable", "ec2:ModifyVpcAttribute", "ec2:ReleaseAddress", "ec2:ReplaceIamInstanceProfileAssociation", "ec2:ReplaceRoute", "ec2:RequestSpotInstances", "ec2:RevokeSecurityGroupEgress", "ec2:RevokeSecurityGroupIngress", "ec2:RunInstances", "ec2:TerminateInstances" ], "Resource": [ "*" ], "Effect": "Allow", "Sid": "Stmt1403287045000" }, { "Action": "iam:PassRole", "Resource": "arn:aws:iam::<aws-account-id-databricks>:role/<iam-role-for-s3-access>", "Effect": "Allow" }, { "Condition": { "StringLike": { "iam:AWSServiceName": "spot.amazonaws.com" } }, "Action": [ "iam:CreateServiceLinkedRole", "iam:PutRolePolicy" ], "Resource": [ "arn:aws:iam::*:role/aws-service-role/spot.amazonaws.com/AWSServiceRoleForEC2Spot" ], "Effect": "Allow" } ] }
Step 4: Create a cluster and a new user on AWS¶
Create a cluster in Databricks using the Instance Profile created by AWS CloudFormations. To get the Instance Profile name, go to Settings > Admin Console > Instance Profiles in Databricks.
Create Cluster¶
Create a new AWS user. Be sure to give the user (the connector) programmatic access.
Create new AWS user¶
On the Set permissions dialog, select Attach existing policies directly.
Add user policy¶
This policy is the same as the one used for the Amazon S3 sink
connector
with the addition of "s3:PutObjectTagging"
, "s3:GetObjectTagging"
, and
"s3:DeleteObject"
. The connector requires DeleteObject
so it can delete
files in the staging S3 bucket once they are copied to the Delta Lake table.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::<bucket-name>"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads",
"s3:PutObjectTagging",
"s3:GetObjectTagging",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::<bucket-name>/*"
}
]
}
Step 5: Create a table¶
Databricks Notebook を使用してテーブルを作成します。テーブルの作成には、以下の例を使用します。この例には、pageviews
Kafka トピックと Datagen Source Connector for Confluent Cloud のスキーマ例が示されています。
{
"connect.name": "ksql.pageviews",
"fields": [
{
"name": "viewtime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "pageid",
"type": "string"
}
],
"name": "pageviews",
"namespace": "ksql",
"type": "record"
}
%sql
CREATE TABLE pageviews (viewtime LONG, userid STRING, pageid STRING, partition INT) USING DELTA
partition
という名前のフィールドが追加されるので、テーブルを定義する際に必ず partition INT
を含めてください。Step 6: Gather connector configuration information¶
重要
Confluent Cloud Console または CLI を使用してコネクター構成を完了するには、以下の情報が必要です。
Databricks サーバーのホスト名: たとえば、
dbc-12345df33-0e53.cloud.databricks.com
です。ホスト名は、Databricks Create Cluster ページに表示されます。HTTP パス: たとえば、
sql/protocolv1/o/123456789101112/1004-123456-voice40
です。パスは、Databricks Create Cluster ページに表示されます。ホスト名と HTTP パス¶
Delta Lake トークン: トークンを探すか、新しいアクセストークンを生成できます。Settings、User Settings、Access Tokens の順に移動します。
アクセストークン¶
Delta Lake テーブル名: この例の手順では、
pageviews
が使用されます。ユーザーの AWS アクセスキー ID と シークレットアクセスキー。詳細については、AWS のドキュメント を参照してください。
ステージング S3 バケット名: たとえば
confluent-databricks
です。