重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Amazon DynamoDB Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Amazon DynamoDB Sink Connector for Confluent Platform」を参照してください。

Apache Kafka® から Amazon DynamoDB にメッセージをエクスポートするには、Kafka Connect Amazon DynamoDB Sink Connector for Confluent Cloud を使用します。これにより、Kafka データを DynamoDB キー値とドキュメントデータベースにエクスポートできるようになります。

このコネクターは、Kafka のデータを定期的にポーリングして、Amazon DynamoDB にデータを書き込みます。各 Kafka トピックのデータは、バッチにまとめられてから DynamoDB に送信されます。DynamoDB の制約により、各バッチには変更をキーごとに 1 つしか含めることができません。また、バッチが失敗した場合は、そのつど対処してから次のバッチを処理する必要があります。これらの制限によって、厳密に 1 回のデリバリーを確実に実行します。テーブルが存在しない場合は、コネクターが、その構成や権限に応じて動的にテーブルを作成します。

機能

  • テーブルの自動作成: テーブルは、トピック名に基づいた自動作成およびレコードスキーマに基づいた自動進化ができます。
  • 厳選された構成プロパティ:
    • aws.dynamodb.pk.hash: DynamoDB テーブルのハッシュキーがレコードからどのように抽出されるかを定義します。デフォルトでは、レコードが生成される Kafka パーティション番号がハッシュキーとして使用されます。他のレコードリファレンスをハッシュキーの作成に使用することもできます。例については、「DynamoDB のハッシュキーおよびソートキー」を参照してください。
    • aws.dynamodb.pk.sort: DynamoDB テーブルのソートキーがレコードからどのように抽出されるかを定義します。デフォルトでは、レコードオフセットがソートキーとして使用されます。他のリファレンスからソートキーを作成することもできます。例については、「DynamoDB のハッシュキーおよびソートキー」を参照してください。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

DynamoDB IAM のポリシー

コネクターの IAM ユーザーを作成します。作成したユーザーに IAM ポリシーを割り当てます。ポリシーには少なくとも以下の権限が必要です。

  • CreateTable
  • BatchWriteItem
  • Scan
  • DescribeTable

以下の JSON ポリシーをコピーできます。詳細については、「[JSON] タブでのポリシーの作成」を参照してください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "<optional-identifier>",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:BatchWriteItem",
                "dynamodb:Scan",
                "dynamodb:DescribeTable"
            ],
            "Resource": "*"
        }
    ]
}

DynamoDB のハッシュキーおよびソートキー

以下の例では、aws.dynamodb.pk.hash および aws.dynamodb.pk.sort の使用方法を示しています。この例では、以下の Avro レコードを使用します。

{
     "ordertime": 1511538140542,
     "orderid": 3243,
     "itemid": "Item_117",
     "orderunits": 1.135368875862714,
     "address": {
       "city": "City_43",
       "state": "State_53",
     }
}

例 1

テーブルのハッシュキーに、レコードが生成された "partition" 番号を設定します。テーブルのソートキーは、レコード "offset" です。以下の例では、これらのデフォルトの構成プロパティを使用します。

  • "aws.dynamodb.pk.hash":"partition"
  • "aws.dynamodb.pk.sort":"offset"

これらのプロパティを使用すると、DynamoDB のテーブルは、以下の例のようになります。

パーティション オフセット アドレス itemid orderid ordertime orderunits
0 6075 {"city":{"S":City_66}, "state":{"S":"State_42},...} Item_246 6075 1503153618445 3.0818679447783652
0 6076 {"city":{"S":City_38}, "state":{"S":"State_49},...} Item_536 6076 1515872966736 1.6264301342871472
0 6077 {"city":{"S":City_32}, "state":{"S":"State_62},...} Item_997 6077 1515872966736 4.189731783402986

例 2

テーブルのハッシュキーに "value.orderid" を設定します。テーブルのソートキーは、レコード "" です。この例ではソートキーが不要であるため、次のように空の文字列を使用できます。"aws.dynamodb.pk.sort":""

  • "aws.dynamodb.pk.hash":"value.orderid"
  • "aws.dynamodb.pk.sort":""

これらのプロパティを使用すると、DynamoDB のテーブルは、以下の例のようになります。

orderid アドレス itemid ordertime orderunits
2007 {"city":{"S":City_69}, "state":{"S":State_19},...} Item_809 1502071602628 8.9866703527786968
2011 {"city":{"S":City_32}, "state":{"S":State_11},...} Item_524 1494848995282 2.581428966318308
2012 {"city":{"S":City_88}, "state":{"S":State_94},...} Item_169 1491811930181 1.5716303109073455

例 3

テーブルのハッシュキーに "value.orderid" を設定します。テーブルのソートキーに "value.ordertime" を設定します。この例では、1 つのレコードフィールド("ordertime")がソートキーとして使用されます。

  • "aws.dynamodb.pk.hash":"value.orderid"
  • "aws.dynamodb.pk.sort":"value.ordertime"

これらのプロパティを使用すると、DynamoDB のテーブルは、以下の例のようになります。

orderid ordertime アドレス itemid orderunits
4520 1519049522647 {"city":{"S":City_99}, "state":{"S":"State_38},...} Item_650 7.658775648983428
4522 1519049522647 {"city":{"S":City_72}, "state":{"S":State_89},...} Item_503 2.1383312466612261
4523 1507101063792 {"city":{"S":City_74}, "state":{"S":State_99},...} Item_369 2.1383312466612261

スループットの管理

コネクターでテーブルが自動生成される場合、10 ユニットの書き込みキャパシティ がプロビジョニングされます。コネクターで、プロビジョニングされたキャパシティよりも高速でレコードを送信する必要がある場合は、以下のエラーメッセージが表示される可能性があります。

Hit provisioning capacity, will retry indefinitely.. Increase your throughput capacity

書き込みキャパシティを増やすか、Amazon DynamoDB Auto Scaling を使用できます。

クイックスタート

このクイックスタートを使用して、Confluent Cloud Amazon DynamoDB Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Amazon Redshift にイベントをストリームするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス 上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • AWS および Amazon DynamoDB データベースへのアクセスを許可されていること。詳細については、「DynamoDB IAM のポリシー」を参照してください。
  • データベースと Confluent Cloud クラスターは同じリージョンに存在している必要があります。
  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Amazon DynamoDB Sink connector card.

Amazon DynamoDB Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク ( * ) は必須項目であることを示しています。

Add Amazon DynamoDB Sink connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: DynamoDB で結果を確認します。

データベースに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI を使用する場合

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行」を参照してください。

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe DynamoDbSink

出力例:

Following are the required configs:
connector.class: DynamoDbSink
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
aws.access.key.id
aws.secret.access.key
tasks.max
topics

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティとオプションのプロパティを示しています。

{
  "name": "DynamoDbSinkConnector_0",
  "config": {
    "topics": "pageviews",
    "input.data.format": "AVRO",
    "connector.class": "DynamoDbSink",
    "name": "DynamoDbSinkConnector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "aws.access.key.id": "********************",
    "aws.secret.access.key": "****************************************",
    "aws.dynamodb.pk.hash": "value.userid",
    "aws.dynamodb.pk.sort": "value.pageid",
    "table.name.format": "kafka-${topic}",
    "tasks.max": "1"
  }
}

以下のプロパティ定義に注意してください。

  • "name": 新しいコネクターの名前を設定します。
  • "connector.class": コネクターのプラグイン名を指定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SRPROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。

  • "aws.dynamodb.pk.hash": DynamoDB テーブルのハッシュキーがレコードからどのように抽出されるかを定義します。デフォルトでは、レコードが生成される Kafka パーティション番号がハッシュキーとして使用されます。ハッシュキーは、他のレコードリファレンスからも作成できます。例については、「DynamoDB のハッシュキーおよびソートキー」を参照してください。デフォルト構成でのパーティションの最大サイズは 10 GB です(Amazon DynamoDB で定義)。

  • "aws.dynamodb.pk.sort": DynamoDB テーブルのソートキーがレコードからどのように抽出されるかを定義します。デフォルトでは、レコードオフセットがソートキーとして使用されます。ソートキーが不要な場合、このプロパティに空の文字列 "" を使用します。ソートキーは、他のレコードリファレンスからも作成できます。例については、「DynamoDB のハッシュキーおよびソートキー」を参照してください。

  • "table.name.format": このプロパティの指定は任意です。デフォルトでは Kafka トピックの名前が使用されます。テーブル名フォーマットを作成するには、構文 ${topic} を使用します。たとえば、トピック orderskafka_${topic} は、テーブル名 kafka_orders にマップされます。

  • "tasks.max": このコネクターで実行できるタスクの最大数。タスクについて詳しくは、Confluent Cloud コネクターの制限事項 を参照してください。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 4: 構成ファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config dynamodb-sink-config.json

出力例:

Created connector DynamoDbSinkConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |       Name              | Status  | Type
+-----------+-------------------------+---------+------+
lcc-ix4dl   | DynamoDbSinkConnector_0 | RUNNING | sink

ステップ 6: Redshift で結果を確認します。

データベースに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

AWS 認証情報(AWS credentials)

aws.access.key.id
  • 型: password
  • 重要度: 高
aws.secret.access.key
  • 型: password
  • 重要度: 高

DynamoDB パラメーター(DynamoDB Parameters)

aws.dynamodb.pk.hash
  • 型: string
  • デフォルト: partition
  • 重要度: 高
aws.dynamodb.pk.sort
  • 型: string
  • デフォルト: オフセット
  • 重要度: 高
table.name.format

送信先テーブル名の形式制御文字列。送信元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。

たとえば、トピック「orders」の場合、kafka_${topic} はテーブル名「kafka_orders」にマップされます。

  • 型: string
  • デフォルト: ${topic}
  • 重要度: 中

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png