Amazon DynamoDB CDC Source Connector for Confluent Cloud

The fully-managed Amazon DynamoDB CDC Source connector for Confluent Cloud moves data from Amazon DynamoDB to a Apache Kafka® topic.

Note

If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.

Features

The Amazon DynamoDB CDC Source connector includes the following features:

  • IAM user authentication: The connector supports authenticating to DynamoDB using IAM user access credentials.

  • Provider integration support: The connector supports IAM role-based authorization using Confluent Provider Integration. For more information about provider integration setup, see the IAM roles authentication.

  • Customizable API endpoints: The connector allows you to specify an AWS DynamoDB API and Resource Group Tag API endpoint.

  • Kafka cluster authentication customization: The connector supports authenticating a Kafka cluster using API keys and/or service accounts.

  • Snapshot mode customization: The connector allows you to configure either of the following modes for snapshots:

    • SNAPSHOT: Only allows a one-time scan (Snapshot) of the existing data in the source tables simultaneously.
    • CDC: Only allows CDC with DynamoDB streams without initial snapshot for all streams simultaneously.
    • SNAPSHOT_CDC (Default): Allows an initial snapshot of all configured tables and once the snapshot is complete, starts CDC streaming using DynamoDB streams.
  • Seamless table streaming: The connector support the following two modes to provide seamless table streaming:

    • TAG_MODE: Auto-discover multiple DynamoDB tables and stream simultaneously (that is, dynamodb.table.discovery.mode is set to TAG
    • INCLUDELIST_MODE: Explicitly specify/select specific multiple DynamoDB table names and stream simultaneously (that is, dynamodb.table.discovery.mode is set to INCLUDELIST).
  • Automatic topic creation: The connector supports the auto-creation of topics with the name of the table, with a customer-provided prefix and suffix using TopicRegexRouter Single Message Transformation (SMT).

  • Supported data formats: The connector supports Avro, Protobuf, and JSON Schema output formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For more information, see Schema Registry Enabled Environments.

  • Schema management: The connector supports Schema Registry, Schema Context and Reference Subject Naming Strategy.

  • AWS DynamoDB scanning capabilities: The connector includes the following scanning capabilities:

    • Parallel Scans: A DynamoDB table can be logically divided into multiple segments. The connector will divide the table into five logical segments and scan these segments in parallel.
    • Pagination: Tables are scanned sequentially, and a scan result’s response can fetch no more than 1 MB of data. Since tables can be large, scan request responses are paginated. With each response, a LastEvaluatedKey is returned. This LastEvaluatedKey from a scan response should be used as the ExclusiveStartKey for the next scan request. If no LastEvaluatedKey is returned, it indicates that the end of the result set has been reached.
    • Non-isolated scans: To scan an entire table, the task continues making multiple subsequent scan requests by submitting the appropriate exclusiveStartKey with each request. For a large table, this process may take hours, and the snapshot will capture items as they are at the time a scan request is made, and not from when the snapshot operation was started.
    • Eventual Consistency: By default, a scan uses eventually consistent reads when accessing items in a table. Therefore, the results from a consistent scan may not reflect the latest item changes at the time the scan iterates through each item in the table. A snapshot, on the other hand, only captures data from committed transactions. As a result, a scan will not include data from ongoing uncommitted transactions. Additionally, a snapshot does not need to manage or track any ongoing transactions on a DynamoDB table.
  • Custom offset support: The connector allows you to configure custom offsets using the Confluent Cloud user interface to prevent data loss and data duplication.

  • Tombstone event and record deletion management: The connector allows you to manage tombstone events and deleted records. Note that when the connector detects a delete event, it creates two event messages:

    • A delete event message with op type d and document field with the table primary key:

      {
         "op": "d"
         "key": {
           "id": "5028"
         },
         "value": {
           "document": "5028"
         },
      }
      
    • A tombstone record with Kafka Record Key as the table primary key value and Kafka Record Value as null:

      {
         "key": {
           "id": "5028"
         },
         "value": null
      }
      

      Note that Kafka log compaction uses this to know that it can delete all messages for this key.

    Tombstone message sample

    {
       "topic": "table1",
       "key": {
         "id": "5028"
       },
       "value": null,
       "partition": 0,
       "offset": 1
    }
    
  • Lease table prefix customization: The connector supports naming lease tables with a prefix.

Limitations

Be sure to review the following information.

IAM policy DynamoDB CDC

The following permissions are required for the Amazon DynamoDB CDC Source connector:

  • DescribeTable
  • DescribeStream
  • ListTagsOfResource
  • DescribeLimits
  • GetRecords
  • GetShardIterator
  • Scan
  • ListStreams
  • ListTables
  • ListGlobalTables
  • GetResources

You can copy the following JSON policy. For more information, see Creating policies on the JSON tab.

 {
   "Version": "2012-10-17",
   "Statement": [
     {
       "Effect": "Allow",
       "Action": [
         "dynamodb:DescribeTable",
         "dynamodb:DescribeStream",
         "dynamodb:ListTagsOfResource",
         "dynamodb:DescribeLimits",
         "dynamodb:GetRecords",
         "dynamodb:GetShardIterator",
         "dynamodb:Scan"
       ],
       "Resource": [
         "arn:aws:dynamodb:*:*:table/*"
       ]
     },
     {
       "Effect": "Allow",
       "Action": [
         "dynamodb:CreateTable",
         "dynamodb:DescribeTable",
         "dynamodb:GetItem",
         "dynamodb:PutItem",
         "dynamodb:Scan",
         "dynamodb:UpdateItem"
       ],
       "Resource": [
         "arn:aws:dynamodb:*:*:table/<dynamodb.cdc.checkpointing.table.prefix>-*"     // regex for checkpointing tables
       ]
     },
     {
       "Effect": "Allow",
       "Action": [
         "dynamodb:ListStreams",
         "dynamodb:ListTables",
         "dynamodb:ListGlobalTables",
         "tag:GetResources"
       ],
       "Resource": [
         "*"
       ]
     }
   ]
 }

Note the following with the previous JSON policy:

Destination DynamoDB tables as data sources
  • The first highlighted section contains the permissions required for the DynamoDB Table from which data is sourced.
Target Checkpointing table created by the connector
  • The second highlighted section contains the permissions required for the Checkpointing table created during the CDC phase.
  • <dynamodb.cdc.checkpointing.table.prefix> configures the prefix for the checkpointing table created.
Destination DynamoDB streams as data sources
  • The third highlighted section contains the permissions required for DynamoDB Streams from which data is sourced.

Offset guidance

The connector currently works with the following offsets:

[
    {

       // TypeA: this offset is logged once per table and should not be modified

        "partition": {
        "tableName": "<TableName>",
        "cdcStarted": "true"
        },
        "offset": {
        "cdcStarted": "true"
        }
    },

    {
       // TypeB: This offset is logged 5 times (segment-0, ... segment-4) for each table and should not be modified.

        "partition": {
        "segment": "0",
        "tableName": "<TableName>"
        },
        "offset": {
        "segment": "0",
        "snapshotCount": "<SnapshotCount>",
        "snapshotStartTime": <SnapshotStartTime>,
        "tableName": "<TableName>",
        "totalSegments": "<0-5>"
        }
    },
    {

      // TypeC: This offset is logged once for each table and can be modified by the user.

        "partition": {
        "stream": "<TableName>"
        },
        "offset": {
        "cdc.start.position.timestamp": "<Timestamp>",    // required
        "cdc.start.position.timestamp.format": "<Timestamp-format>"        // optional
        }
    }
]

Update the offset

You can only edit the following offsets for a DynamoDB Stream.

{
    "partition": {
        "stream": "<TableName>" // required
    },
    "offset": {
        "cdc.start.position.timestamp": "<Timestamp>",   // required
        "cdc.start.position.timestamp.format": ""        // optional
    }
}

Considerations:

  • You can change offset for each DynamoDB Stream individually. A task will start streaming data from the specified timestamp from the configured DynamoDB Stream.
  • If no records in DynamoDB Streams are older than specified timestamp, all records from DynamoDB Streams will be consumed again (equivalent to TRIM_HORIZON).
  • If timestamp specified is ahead of the most recent record in the corresponding DynamoDB Stream, no existing records from stream will be consumed (equivalent to LATEST).
  • A new DynamoDB Table (leaseTable) will be created in the customer’s AWS account on every offset modification after the connector goes into CDC phase.
  • This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
  • Users are advised not to modify/edit any offsets of TypeA or TypeB. Users must edit only TypeC offsets.
  • If cdc.start.position.timestamp.format is not provided, it will default to yyyy-MM-dd'T'HH:mm:ss'Z'.
  • You must configure cdc.start.position.timestamp in the format of cdc.start.position.timestamp.format (configured or default).

JSON payload

The following table provides a description of the unique fields in the JSON payload for managing offsets of the DynamboDB CDC Source connector.

Field Definition Required/Optional
stream DynamoDB stream configured for CDC. Required
cdc.start.position.timestamp Used to specify the timestamp in the stream where a new application should start from. Required
cdc.start.position.timestamp.format The format of timestamp in the stream where a new application should start from. The format should abide by patterns specified in java.time.format.DateTimeFormatter. The default is yyyy-MM-dd'T'HH:mm:ss'Z' Optional

Connector events

This section describes the events the Amazon DynamoDB CDC connector creates in different scenarios.

Schemas and records created with the Avro converter

The Amazon DynamoDB CDC connector creates events the Avro converter can leverage to manage schemas and records effectively.

Consider the following mapping and data representation:

  • Topic name: <tableName> (you can change this using the TopicRouting SMTs)

  • Topic schema:

        "schema" {
            "connect.name": "io.confluent.connect.dynamodb.<tableName>.Envelope",
            "fields": [
                {
                    "name": "op",
                    "type": "string"
                },
                {
                    "name": "ts_ms",
                    "type": "long"
                },
                {
                    "name": "ts_us",
                    "type": "long"
                },
                {
                    "name": "ts_ns",
                    "type": "long"
                },
                {
                    "default": null,
                    "name": "source",
                    "type": [
                    "null",
                    {
                        "connect.name": "io.confluent.connector.dynamodb.Source",
                        "fields": [
                        {
                            "default": null,
                            "name": "version",
                            "type": [
                            "null",
                            "string"
                            ]
                        },
                        {
                            "name": "tableName",
                            "type": "string"
                        },
                        {
                            "name": "sync_mode",
                            "type": "string"
                        },
                        {
                            "name": "ts_ms",
                            "type": "long"
                        },
                        {
                            "name": "ts_us",
                            "type": "long"
                        },
                        {
                            "name": "ts_ns",
                            "type": "long"
                        },
                        {
                            "default": null,
                            "name": "snapshotStartTime",
                            "type": [
                            "null",
                            "long"
                            ]
                        },
                        {
                            "default": null,
                            "name": "snapshotCount",
                            "type": [
                            "null",
                            "int"
                            ]
                        },
                        {
                            "default": null,
                            "name": "segment",
                            "type": [
                            "null",
                            "int"
                            ]
                        },
                        {
                            "default": null,
                            "name": "totalSegments",
                            "type": [
                            "null",
                            "int"
                            ]
                        },
                        {
                            "default": null,
                            "name": "shard_Id",
                            "type": [
                            "null",
                            "string"
                            ]
                        },
                        {
                            "default": null,
                            "name": "seq_No",
                            "type": [
                            "null",
                            "string"
                            ]
                        }
                        ],
                        "name": "Source",
                        "namespace": "io.confluent.connector.dynamodb",
                        "type": "record"
                    }
                    ]
                },
                {
                    "default": null,
                    "name": "before",
                    "type": [
                    "null",
                    {
                        "connect.name": "io.confluent.connect.dynamodb.<tableName>.Value",
                        "fields": [
                        {
                            "default": null,
                            "name": "document",
                            "type": [
                            "null",
                            "string"
                            ]
                        }
                        ],
                        "name": "Value",
                        "type": "record"
                    }
                    ]
                },
                {
                    "default": null,
                    "name": "after",
                    "type": [
                    "null",
                    "Value"
                    ]
                }
        ],
        "name": "Envelope",
        "namespace": "io.confluent.connect.dynamodb.<tableName>",
        "type": "record"
      }
    }
    

The following table gives a description of each of the fields names denoted in the previous example.

Field Name Description
schema The value’s schema, which describes the structure of the value’s payload. For a particular table, all events will have the same schema
connect.name The value’s schema, which describes the structure of the value’s payload. For a particular table, all events will have the same schema
op

Mandatory string that describes the type of operation that caused the connector to generate the event. Valid values are:

  • c: create
  • u: update
  • d: delete
  • r: read (applies to only snapshots)
ts_ms, ts_us, ts_ns Field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. In the source object, ts_ms indicates the approximate timestamp at which change event was added in DDB Stream. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the DDB Stream and the connector.
source

Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events and the order in which the events occurred. The source metadata includes:

  • Source version (to denote evolution in source schema)
  • Table name
  • SYNC_MODE, valid values are:
    • CDC (when in Streaming phase)
    • SNAPSHOT (when in SNAPSHOT phase)
  • Timestamp:
    • SNAPSHOT: Timestamp at which table record was scanned.
    • CDC: Approximate timestamp at which event was added into DDB Streams.
  • SNAPSHOT fields:
    • snapshotStartTime: Timestamp when snapshot was started for table-segment which this event belongs to. Only present in SNAPSHOT events.
    • snapshotCount: Number of snapshot events scanned before this event in this table-segment. Only present in SNAPSHOT events.
    • segment: The table-segment to which this event belongs to. Only present in SNAPSHOT events.
    • totalSegments: Total number of table-segments for this table. Only present in SNAPSHOT events
  • CDC fields:
    • shard_id: ShardId of the shard in DDB Stream from where this event was consumed from. Only present in CDC events.
    • seq_no: SequenceNumber of this event in shard in DDB Stream from where this event was consumed from. Only present in CDC events.
before

An optional field that specifies the state of the row before the event occurred. This field will be populated only with CDC events, when:

  • "op": "u"
  • "op": "d"

This field would only be populated when StreamViewType is NEW_AND_OLD_IMAGES on DynamoDB Streams.

after

An optional field that specifies the state of the row after the event occurred. This field will be populated with:

  • "op": "r" (SNAPSHOT)
  • "op": "c" (CDC)
  • "op": "u" (CDC)
connect.name

io.confluent.connect.dynamodb.<tableName>.Value is the schema for the payload’s before and after fields. This schema is specific to the table (tableName)

Names of schemas for before and after fields are of the form io.confluent.connect.dynamodb.<tableName>.Value, which ensures that the schema name is unique in the database. This means that when using the Avro converter, the resulting Avro schema for each table in each logical source has its own evolution and history.

The connector creates different kinds of events in the following scenarios:

Example record: "op: r"

{
   "op": "r",
   "ts_ms": 1719830776204,
   "ts_us": 1719830776204000,
   "ts_ns": 1719830776204000000,
   "source": {
       "io.confluent.connector.dynamodb.Source": {
       "version": null,
       "tableName": "NumTable",
       "sync_mode": "SNAPSHOT",
       "ts_ms": 1719830776204,
       "ts_us": 1719830776204000,
       "ts_ns": 1719830776204000000,
       "snapshotStartTime": {
           "long": 1719830776087
       },
       "snapshotCount": {
           "int": 1
       },
       "segment": {
           "int": 0
       },
       "totalSegments": {
           "int": 5
       },
       "shard_Id": null,
       "seq_No": null
       }
   },
   "before": null,
   "after": {
       "io.confluent.connect.dynamodb.NumTable.Value": {
       "document": {
           "string": "{\"Number2\":{\"N\":\"1\"},\"Number\":{\"N\":\"1\"}}"
       }
     }
   }
}

Quick start

Use this quick start to get up and running with the Amazon DynamoDB CDC Source connector for Confluent Cloud.

Prerequisites

Ensure you have the met the following requirements before moving ahead.

  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

See the Quick Start for Confluent Cloud for installation instructions.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the DynamoDB CDC Source connector card.

Amazon DynamoDB CDC Source Connector Card

Step 4: Enter the connector details

Note

  • Ensure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

At the Add Amazon DynamoDB CDC Source Connector screen, complete the following:

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:
    • My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
    • Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
    • Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
  2. Click Continue.

Step 5: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

Using the Confluent CLI

You can use the following steps to set up and run the connector using the Confluent CLI, but first, ensure you have met all prerequisites.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows required and optional connector properties.

{
  "config": {
    "connector.class": "DynamoDbCdcSource",
    "name": "DynamoDbCdcSourceConnector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "schema.context.name": "default",
    "aws.access.key.id": "<my-aws-access-key-id>",
    "aws.secret.access.key": "<my-aws-secret-access-key>",
    "output.data.format": "JSON",
    "dynamodb.service.endpoint": "http://localhost:8080",
    "dynamodb.table.discovery.mode": "INCLUDELIST",
    "dynamodb.table.sync.mode": "SNAPSHOT_CDC",
    "dynamodb.table.includelist": "t1, t2",
    "max.batch.size": "1000",
    "poll.linger.ms": "5000",
    "dynamodb.snapshot.max.poll.records": "1000",
    "dynamodb.cdc.checkpointing.table.prefix": "connect-KCL-",
    "dynamodb.cdc.table.billing.mode": "PROVISIONED",
    "dynamodb.cdc.max.poll.records": "5000",
    "tasks.max": "1"
  }
}

For all property values and definitions, see Configuration properties.

Single Message Transforms: For details about adding SMTs using the Confluent CLI, see the Single Message Transformation (SMT) documentation. For a list of SMTs that are not supported with this connector, see Unsupported transformations.

Step 4: Load the configuration file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file dynamodb-cdc-source-config.json

Example output:

Created connector DynamoDbCdcSourceConnector_0 lcc-ix4dl

Step 5: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID          |       Name                  | Status  | Type
+-----------+-----------------------------+---------+------+
lcc-ix4dl   | DynamoDbCdcSourceConnector_0| RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

Configuration properties

Use the following configuration properties with the fully-managed connector.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high

Schema Config

key.subject.name.strategy

Determines how to construct the subject name under which the key schema is registered with Schema Registry.

  • Type: string
  • Default: TopicNameStrategy
  • Valid Values: RecordNameStrategy, TopicNameStrategy
  • Importance: medium
schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string
  • Default: default
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Type: string
  • Default: TopicNameStrategy
  • Valid Values: RecordNameStrategy, TopicNameStrategy, TopicRecordNameStrategy
  • Importance: medium
key.converter.reference.subject.name.strategy

Set the subject reference name strategy for key. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string
  • Default: DefaultReferenceSubjectNameStrategy
  • Importance: medium
value.converter.reference.subject.name.strategy

Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string
  • Default: DefaultReferenceSubjectNameStrategy
  • Importance: medium

AWS credentials

authentication.method

Select how you want to authenticate with AWS.

  • Type: string
  • Default: Access Keys
  • Importance: high
aws.access.key.id
  • Type: password
  • Importance: high
provider.integration.id

Select an existing integration that has access to your resource. In case you need to integrate a new IAM role, use provider integration

  • Type: string
  • Importance: high
aws.secret.access.key
  • Type: password
  • Importance: high

Output messages

output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF

  • Type: string
  • Default: AVRO
  • Importance: high
output.data.key.format

Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: AVRO
  • Valid Values: AVRO, JSON_SR, PROTOBUF
  • Importance: high

DynamoDB Details

dynamodb.service.endpoint

AWS DynamoDB API Endpoint

  • Type: string
  • Importance: high
dynamodb.table.sync.mode

Define table sync mode. SNAPSHOT_CDC - start with SNAPSHOT and switch to CDC mode on completion; SNAPSHOT - perform SNAPSHOT only,CDC - perform CDC only

  • Type: string
  • Default: SNAPSHOT_CDC
  • Importance: high
dynamodb.table.discovery.mode

Specifies the table discovery mode that defines the list of tables to be captured.

TAG - use resource tag discovery using AWS Resource Tagging endpoint.

INCLUDELIST- use explicit table names to be included
  • Type: string
  • Default: INCLUDELIST
  • Importance: high
aws.resource.tagging.endpoint

AWS Resource Group Tag API Endpoint. Required if dynamodb.table.discovery.mode is selected as TAG

  • Type: string
  • Importance: high
dynamodb.table.tag.filters

A semi-colon-separated list of pairs in the form <tag-key>:<value-1>,<value-2> that is used to create tag filters. For example, key1:v1,v2;key2:v3,v4 will include all tags that match key1 key with value of either v1 or v2, and match key2 with value of either v3 or v4. Any keys not specified will be excluded.

  • Type: string
  • Importance: high
dynamodb.table.includelist

A comma-separated list of DynamoDB table names to be captured. This is required if dynamodb.table.discovery.mode is set to INCLUDELIST

  • Type: string
  • Importance: high
max.batch.size

The maximum number of records that will be returned by the connector to Connect. The connector may still return fewer records if no additional records are available.

  • Type: int
  • Default: 1000
  • Valid Values: [100,…,10000]
  • Importance: medium
poll.linger.ms

The maximum time to wait for a record before returning an empty batch. The call to poll can return early before poll.linger.ms expires if max.batch.size records are received.

  • Type: long
  • Default: 5000 (5 seconds)
  • Valid Values: [0,…,300000]
  • Importance: medium

Snapshot Details

dynamodb.snapshot.max.poll.records

Maximum number of records that can be returned in single DynamoDB read operation. Only applicable to SNAPSHOT phase. Note that there is 1MB size limit as well.

  • Type: int
  • Default: 1000
  • Importance: medium
dynamodb.snapshot.max.rcu.percentage

Configure percentage of table read capacity that will be used as a maximum limit of RCU consumption rate

  • Type: int
  • Default: 50
  • Importance: medium

CDC Details

dynamodb.cdc.initial.stream.position

Specifies the position in the stream where a new application should start from. This is used during initial application bootstrap (when a checkpoint doesn’t exist for a shard or its parents). Used only in CDC mode.TRIM_HORIZON - Start from the oldest available data record. LATEST - Start after the most recent data record (fetch new data).AT_TIMESTAMP - Start from the record at or after the specified server-side timestamp

  • Type: string
  • Importance: medium
cdc.start.position.timestamp

Specifies the timestamp in the stream where a new application should start from. This is used during initial application bootstrap (when a checkpoint doesn’t exist for a shard or its parents). Used only in CDC mode.

  • Type: string
  • Importance: medium
cdc.start.position.timestamp.format

The format of timestamp in the stream where a new application should start fromThe format should abide by patterns specified in java.time.format.DateTimeFormatter https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#patterns. This is used during initial application bootstrap (when a checkpoint doesn’t exist for a shard or its parents). Used only in CDC mode.

  • Type: string
  • Default: yyyy-MM-dd’T’HH:mm:ss’Z’
  • Importance: medium
dynamodb.cdc.checkpointing.table.prefix

Prefix for CDC Checkpointing tables, must be unique per connector. Checkpointing table is used to store the last processed record for each shard and is used to resume from lastprocessed record in case of connector restart. This is applicable only in CDC mode.

  • Type: string
  • Default: connect-KCL-
  • Importance: medium
dynamodb.cdc.table.billing.mode

Define billing mode for internal checkpoint table created with CDC. Allowed values: PROVISIONED, PAY_PER_REQUEST.Default is PROVISIONED. Use PAY_PER_REQUEST for unpredictable application traffic and on-demand billing mode. Use PROVISIONED for predictable application traffic and provisioned billing mode.

  • Type: string
  • Default: PROVISIONED
  • Importance: medium
dynamodb.cdc.max.poll.records

Maximum number of records that can be returned in single DynamoDB Stream getRecords

  • Type: int
  • Default: 5000
  • Importance: medium
dynamodb.cdc.checkpointing.table.read.capacity

Read capacity for CDC checkpointing table. The checkpointing table is used to track the leases (shards) of the DynamoDB tables and helps determine the resume position in case of connector restarts.

  • Type: int
  • Default: 50
  • Importance: low
dynamodb.cdc.checkpointing.table.write.capacity

Write capacity for CDC checkpointing table. The checkpointing table is used to track the leases (shards) of the DynamoDB tables and helps determine the resume position in case of connector restarts.

  • Type: int
  • Default: 50
  • Importance: low

Number of tasks for this connector

tasks.max

Maximum number of tasks for the connector.

  • Type: int
  • Valid Values: [1,…]
  • Importance: high

Next steps

  • For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

    ../_images/topology.png
  • Try Confluent Cloud on AWS Marketplace with $1000 of free usage for 30 days, and pay as you go. No credit card is required.