Amazon DynamoDB CDC Source Connector for Confluent Cloud¶
The fully-managed Amazon DynamoDB CDC Source connector for Confluent Cloud moves data from Amazon DynamoDB to a Apache Kafka® topic.
Note
If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.
Features¶
The Amazon DynamoDB CDC Source connector includes the following features:
IAM user authentication: The connector supports authenticating to DynamoDB using IAM user access credentials.
Provider integration support: The connector supports IAM role-based authorization using Confluent Provider Integration. For more information about provider integration setup, see the IAM roles authentication.
Customizable API endpoints: The connector allows you to specify an AWS DynamoDB API and Resource Group Tag API endpoint.
Kafka cluster authentication customization: The connector supports authenticating a Kafka cluster using API keys and/or service accounts.
Snapshot mode customization: The connector allows you to configure either of the following modes for snapshots:
- SNAPSHOT: Only allows a one-time scan (Snapshot) of the existing data in the source tables simultaneously.
- CDC: Only allows CDC with DynamoDB streams without initial snapshot for all streams simultaneously.
- SNAPSHOT_CDC (Default): Allows an initial snapshot of all configured tables and once the snapshot is complete, starts CDC streaming using DynamoDB streams.
Seamless table streaming: The connector support the following two modes to provide seamless table streaming:
- TAG_MODE: Auto-discover multiple DynamoDB tables and stream
simultaneously (that is,
dynamodb.table.discovery.mode
is set toTAG
- INCLUDELIST_MODE: Explicitly specify/select specific multiple DynamoDB
table names and stream simultaneously (that is,
dynamodb.table.discovery.mode
is set toINCLUDELIST
).
- TAG_MODE: Auto-discover multiple DynamoDB tables and stream
simultaneously (that is,
Automatic topic creation: The connector supports the auto-creation of topics with the name of the table, with a customer-provided prefix and suffix using TopicRegexRouter Single Message Transformation (SMT).
Supported data formats: The connector supports Avro, Protobuf, and JSON Schema output formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For more information, see Schema Registry Enabled Environments.
Schema management: The connector supports Schema Registry, Schema Context and Reference Subject Naming Strategy.
AWS DynamoDB scanning capabilities: The connector includes the following scanning capabilities:
- Parallel Scans: A DynamoDB table can be logically divided into multiple segments. The connector will divide the table into five logical segments and scan these segments in parallel.
- Pagination: Tables are scanned sequentially, and a scan result’s
response can fetch no more than 1 MB of data. Since tables can be large, scan
request responses are paginated. With each response, a
LastEvaluatedKey
is returned. ThisLastEvaluatedKey
from a scan response should be used as theExclusiveStartKey
for the next scan request. If noLastEvaluatedKey
is returned, it indicates that the end of the result set has been reached. - Non-isolated scans: To scan an entire table, the task continues making
multiple subsequent scan requests by submitting the appropriate
exclusiveStartKey
with each request. For a large table, this process may take hours, and the snapshot will capture items as they are at the time a scan request is made, and not from when the snapshot operation was started. - Eventual Consistency: By default, a scan uses eventually consistent reads when accessing items in a table. Therefore, the results from a consistent scan may not reflect the latest item changes at the time the scan iterates through each item in the table. A snapshot, on the other hand, only captures data from committed transactions. As a result, a scan will not include data from ongoing uncommitted transactions. Additionally, a snapshot does not need to manage or track any ongoing transactions on a DynamoDB table.
Custom offset support: The connector allows you to configure custom offsets using the Confluent Cloud user interface to prevent data loss and data duplication.
Tombstone event and record deletion management: The connector allows you to manage tombstone events and deleted records. Note that when the connector detects a delete event, it creates two event messages:
A delete event message with
op
typed
anddocument
field with the table primary key:{ "op": "d" "key": { "id": "5028" }, "value": { "document": "5028" }, }
A tombstone record with Kafka Record Key as the table primary key value and Kafka Record Value as
null
:{ "key": { "id": "5028" }, "value": null }
Note that Kafka log compaction uses this to know that it can delete all messages for this key.
Tombstone message sample
{ "topic": "table1", "key": { "id": "5028" }, "value": null, "partition": 0, "offset": 1 }
Lease table prefix customization: The connector supports naming lease tables with a prefix.
Limitations¶
Be sure to review the following information.
- For connector limitations, see Amazon DynamoDB CDC Source Connector limitations.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
IAM policy DynamoDB CDC¶
The following permissions are required for the Amazon DynamoDB CDC Source connector:
- DescribeTable
- DescribeStream
- ListTagsOfResource
- DescribeLimits
- GetRecords
- GetShardIterator
- Scan
- ListStreams
- ListTables
- ListGlobalTables
- GetResources
You can copy the following JSON policy. For more information, see Creating policies on the JSON tab.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTable",
"dynamodb:DescribeStream",
"dynamodb:ListTagsOfResource",
"dynamodb:DescribeLimits",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:Scan",
"dynamodb:DeleteItem"
],
"Resource": [
"arn:aws:dynamodb:*:*:table/*"
]
},
{
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Scan",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:*:*:table/<dynamodb.cdc.checkpointing.table.prefix>-*" // regex for checkpointing tables
]
},
{
"Effect": "Allow",
"Action": [
"dynamodb:ListStreams",
"dynamodb:ListTables",
"dynamodb:ListGlobalTables",
"tag:GetResources"
],
"Resource": [
"*"
]
}
]
}
Note the following with the previous JSON policy:
- Destination DynamoDB tables as data sources
- The first highlighted section contains the permissions required for the DynamoDB Table from which data is sourced.
- Target Checkpointing table created by the connector
- The second highlighted section contains the permissions required for the Checkpointing table created during the CDC phase.
<dynamodb.cdc.checkpointing.table.prefix>
configures the prefix for the checkpointing table created.
- Destination DynamoDB streams as data sources
- The third highlighted section contains the permissions required for DynamoDB Streams from which data is sourced.
Offset guidance¶
The connector currently works with the following offsets:
[
{
// TypeA: this offset is logged once per table and should not be modified
"partition": {
"tableName": "<TableName>",
"cdcStarted": "true"
},
"offset": {
"cdcStarted": "true"
}
},
{
// TypeB: This offset is logged 5 times (segment-0, ... segment-4) for each table and should not be modified.
"partition": {
"segment": "0",
"tableName": "<TableName>"
},
"offset": {
"segment": "0",
"snapshotCount": "<SnapshotCount>",
"snapshotStartTime": <SnapshotStartTime>,
"tableName": "<TableName>",
"totalSegments": "<0-5>"
}
},
{
// TypeC: This offset is logged once for each table and can be modified by the user.
"partition": {
"stream": "<TableName>"
},
"offset": {
"cdc.start.position.timestamp": "<Timestamp>", // required
"cdc.start.position.timestamp.format": "<Timestamp-format>" // optional
}
}
]
Update the offset¶
You can only edit the following offsets for a DynamoDB Stream.
{
"partition": {
"stream": "<TableName>" // required
},
"offset": {
"cdc.start.position.timestamp": "<Timestamp>", // required
"cdc.start.position.timestamp.format": "" // optional
}
}
Considerations:
- You can change offset for each DynamoDB Stream individually. A task will start streaming data from the specified timestamp from the configured DynamoDB Stream.
- If no records in DynamoDB Streams are older than specified timestamp, all
records from DynamoDB Streams will be consumed again (equivalent to
TRIM_HORIZON
). - If timestamp specified is ahead of the most recent record in the corresponding
DynamoDB Stream, no existing records from stream will be consumed
(equivalent to
LATEST
). - A new DynamoDB Table (leaseTable) will be created in the customer’s AWS account on every offset modification after the connector goes into CDC phase.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Users are advised not to modify/edit any offsets of TypeA or TypeB. Users must edit only TypeC offsets.
- If
cdc.start.position.timestamp.format
is not provided, it will default toyyyy-MM-dd'T'HH:mm:ss'Z'
. - You must configure
cdc.start.position.timestamp
in the format ofcdc.start.position.timestamp.format
(configured or default).
JSON payload¶
The following table provides a description of the unique fields in the JSON payload for managing offsets of the DynamboDB CDC Source connector.
Field | Definition | Required/Optional |
---|---|---|
stream |
DynamoDB stream configured for CDC. | Required |
cdc.start.position.timestamp |
Used to specify the timestamp in the stream where a new application should start from. | Required |
cdc.start.position.timestamp.format |
The format of timestamp in the stream where a new application should
start from. The format should abide by patterns specified in
java.time.format.DateTimeFormatter.
The default is yyyy-MM-dd'T'HH:mm:ss'Z' |
Optional |
Connector events¶
This section describes the events the Amazon DynamoDB CDC connector creates in different scenarios.
Schemas and records created with the Avro converter¶
The Amazon DynamoDB CDC connector creates events the Avro converter can leverage to manage schemas and records effectively.
Consider the following mapping and data representation:
Topic name:
<tableName>
(you can change this using the TopicRouting SMTs)Topic schema:
"schema" { "connect.name": "io.confluent.connect.dynamodb.<tableName>.Envelope", "fields": [ { "name": "op", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "ts_us", "type": "long" }, { "name": "ts_ns", "type": "long" }, { "default": null, "name": "source", "type": [ "null", { "connect.name": "io.confluent.connector.dynamodb.Source", "fields": [ { "default": null, "name": "version", "type": [ "null", "string" ] }, { "name": "tableName", "type": "string" }, { "name": "sync_mode", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "ts_us", "type": "long" }, { "name": "ts_ns", "type": "long" }, { "default": null, "name": "snapshotStartTime", "type": [ "null", "long" ] }, { "default": null, "name": "snapshotCount", "type": [ "null", "int" ] }, { "default": null, "name": "segment", "type": [ "null", "int" ] }, { "default": null, "name": "totalSegments", "type": [ "null", "int" ] }, { "default": null, "name": "shard_Id", "type": [ "null", "string" ] }, { "default": null, "name": "seq_No", "type": [ "null", "string" ] } ], "name": "Source", "namespace": "io.confluent.connector.dynamodb", "type": "record" } ] }, { "default": null, "name": "before", "type": [ "null", { "connect.name": "io.confluent.connect.dynamodb.<tableName>.Value", "fields": [ { "default": null, "name": "document", "type": [ "null", "string" ] } ], "name": "Value", "type": "record" } ] }, { "default": null, "name": "after", "type": [ "null", "Value" ] } ], "name": "Envelope", "namespace": "io.confluent.connect.dynamodb.<tableName>", "type": "record" } }
The following table gives a description of each of the fields names denoted in the previous example.
Field Name | Description |
---|---|
schema |
The value’s schema, which describes the structure of the value’s payload. For a particular table, all events will have the same schema |
connect.name |
The value’s schema, which describes the structure of the value’s payload. For a particular table, all events will have the same schema |
op |
Mandatory string that describes the type of operation that caused the connector to generate the event. Valid values are:
|
ts_ms , ts_us , ts_ns |
Field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the
Kafka Connect task. In the source object, ts_ms indicates the
approximate timestamp at which change event was added in DDB Stream. By
comparing the value for payload.source.ts_ms with the value for
payload.ts_ms , you can determine the lag between the DDB Stream and
the connector. |
source |
Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events and the order in which the events occurred. The source metadata includes:
|
before |
An optional field that specifies the state of the row before the event occurred. This field will be populated only with CDC events, when:
This field would only be populated when |
after |
An optional field that specifies the state of the row after the event occurred. This field will be populated with:
|
connect.name |
Names of schemas for |
The connector creates different kinds of events in the following scenarios:
Example record: "op: r"
{
"op": "r",
"ts_ms": 1719830776204,
"ts_us": 1719830776204000,
"ts_ns": 1719830776204000000,
"source": {
"io.confluent.connector.dynamodb.Source": {
"version": null,
"tableName": "NumTable",
"sync_mode": "SNAPSHOT",
"ts_ms": 1719830776204,
"ts_us": 1719830776204000,
"ts_ns": 1719830776204000000,
"snapshotStartTime": {
"long": 1719830776087
},
"snapshotCount": {
"int": 1
},
"segment": {
"int": 0
},
"totalSegments": {
"int": 5
},
"shard_Id": null,
"seq_No": null
}
},
"before": null,
"after": {
"io.confluent.connect.dynamodb.NumTable.Value": {
"document": {
"string": "{\"Number2\":{\"N\":\"1\"},\"Number\":{\"N\":\"1\"}}"
}
}
}
}
Example record 1: "op: c"
{
"op": "c",
"ts_ms": 1719830940000,
"ts_us": 1719830940000000,
"ts_ns": 1719830940000000,
"source": {
"io.confluent.connector.dynamodb.Source": {
"version": null,
"tableName": "NumTable",
"sync_mode": "CDC",
"ts_ms": 1719830973951,
"ts_us": 1719830973951000,
"ts_ns": 1719830973951000000,
"snapshotStartTime": null,
"snapshotCount": null,
"segment": null,
"totalSegments": null,
"shard_Id": {
"string": "shardId-00000001719827428220-04b3c3e0"
},
"seq_No": {
"string": "000000000000000000247"
}
}
},
"before": null,
"after": {
"io.confluent.connect.dynamodb.NumTable.Value": {
"document": {
"string": "{\"Number2\":{\"N\":\"5\"},\"Number\":{\"N\":\"1\"}}"
}
}
}
}
Example record 2: "op: u"
{
"op": "u",
"ts_ms": 1719830820000,
"ts_us": 1719830820000000,
"ts_ns": 1719830820000000,
"source": {
"io.confluent.connector.dynamodb.Source": {
"version": null,
"tableName": "NumTable",
"sync_mode": "CDC",
"ts_ms": 1719830875579,
"ts_us": 1719830875579000,
"ts_ns": 1719830875579000000,
"snapshotStartTime": null,
"snapshotCount": null,
"segment": null,
"totalSegments": null,
"shard_Id": {
"string": "shardId-00000001719827428220-04b3c3e0"
},
"seq_No": {
"string": "000000000000000000234"
}
}
},
"before": {
"io.confluent.connect.dynamodb.NumTable.Value": {
"document": {
"string": "{\"Number2\":{\"N\":\"3\"},\"Number\":{\"N\":\"1\"}}"
}
}
},
"after": {
"io.confluent.connect.dynamodb.NumTable.Value": {
"document": {
"string": "{\"Number2\":{\"N\":\"3\"},\"Number\":{\"N\":\"1\"},\"ONE\":{\"SS\":[\"NE\"]}}"
}
}
}
}
Example record 3: "op: d"
{
"op": "d",
"ts_ms": 1719830820000,
"ts_us": 1719830820000000,
"ts_ns": 1719830820000000,
"source": {
"io.confluent.connector.dynamodb.Source": {
"version": null,
"tableName": "NumTable",
"sync_mode": "CDC",
"ts_ms": 1719830865036,
"ts_us": 1719830865036000,
"ts_ns": 1719830865036000000,
"snapshotStartTime": null,
"snapshotCount": null,
"segment": null,
"totalSegments": null,
"shard_Id": {
"string": "shardId-00000001719827428220-04b3c3e0"
},
"seq_No": {
"string": "000000000000000000232"
}
}
},
"before": {
"io.confluent.connect.dynamodb.NumTable.Value": {
"document": {
"string": "{\"Number2\":{\"N\":\"4\"},\"Number\":{\"N\":\"1\"},\"ONE\":{\"SS\":[\"NE\"]},\"ONE \":{\"SS\":[\"NE\",\"ONE\"]}}"
}
}
},
"after": null
}
Quick start¶
Use this quick start to get up and running with the Amazon DynamoDB CDC Source connector for Confluent Cloud.
Prerequisites¶
Ensure you have the met the following requirements before moving ahead.
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS).
- The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
- Authorized access to AWS and the Amazon DynamoDB database. For more information, see DynamoDB IAM policy.
- For networking considerations, see Networking and DNS. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors.
- Kafka cluster credentials. The following lists the different ways you can provide credentials.
- Enter an existing service account resource ID.
- Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
- Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster¶
See the Quick Start for Confluent Cloud for installation instructions.
Step 2: Add a connector¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details¶
Note
- Ensure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
At the Add Amazon DynamoDB CDC Source Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
- Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
- Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
- Click Continue.
- Under Amazon credentials, select how you want to authenticate with AWS:
- If you select Access Keys, enter your AWS credentials in the Amazon Access Key ID and Amazon Secret Access Key fields to connect to Amazon DynamoDB. For information about how to set these up, see Access Keys.
- If you select IAM Roles, choose an existing integration name under Provider integration name dropdown that has access to your resource. For more information, see Quick Start for Confluent Cloud Provider Integration.
- Click Continue.
Note that configuration properties that are not shown in the Cloud Console use the default values. For all property values and definitions, see Configuration properties.
Select the output record value format: Avro, JSON Schema, Protobuf. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON Schema, or Protobuf). For additional information, see Schema Registry Enabled Environments.
In the AWS DynamoDB API Endpoint field, enter the AWS DynamoDB API endpoint.
Select a table sync mode from the DynamoDB Table Sync Mode dropdown list. Valid values are:
CDC
: Perform CDC only.SNAPSHOT
: Perform a snapshot only.SNAPSHOT_CDC
(Default): The connector starts with a snapshot and then switches to CDC mode upon completion.
Select a table discovery mode from the Table Discovery Mode dropdown list. Valid values are:
INCLUDELIST
: A comma-separated list of DynamoDB table names to be captured. This is required ifdynamodb.table.discovery.mode
is set toINCLUDELIST
.TAG
: A semi-colon-separated list of pairs in the form<tag-key>:<value-1>,<value-2>
that is used to create tag filters. For example,key1:v1,v2;key2:v3,v4
will include all tags that matchkey1
key with value of eitherv1
orv2
, and matchkey2
with value of eitherv3
orv4
. Anykeys
not specified will be excluded.
In the Tables Include List field, enter a comma-separated list of DynamoDB table names to be captured. Note that this is required if
dynamodb.table.discovery.mode
is set toINCLUDELIST
.Show advanced configurations
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
CDC Details
Prefix for CDC Checkpointing table: Prefix for CDC Checkpointing tables, must be unique per connector. Checkpointing table is used to store the last processed record for each shard and is used to resume from last processed record in case of connector restart. This is applicable only in CDC mode.
CDC Checkpointing Table Billing Mode: Define billing mode for internal checkpoint table created with CDC. Valid values are
PROVISIONED
andPAY_PER_REQUEST
. Default isPROVISIONED
. UsePAY_PER_REQUEST
for unpredictable application traffic and on-demand billing mode. UsePROVISIONED
for predictable application traffic and provisioned billing mode.Max number of records per DynamoDB Streams poll: The maximum number of records that can be returned in single DynamoDB Stream getRecords operation. Only applicable in CDC phase. Default value is
5000
.
Snapshot Details
Max records per Table Scan: Maximum number of records that can be returned in single DynamoDB read operation. Only applicable to
SNAPSHOT
phase. Note that there is 1 MB size limit as well.Snapshot Table RCU consumption percentage: Configure the percentage of table read capacity that will be used as a maximum limit of RCU consumption rate.
DynamoDB Details
Maximum batch size: The maximum number of records the connector will wait for before publishing the data on the topic. The connector may still return fewer records if no additional records are available.
Poll linger milliseconds: The maximum time to wait for a record before returning an empty batch. The default is 5 seconds.
Processing position
Define a specific offset position for this connector to begin procession data from by clicking Set offsets. For more information on managing offsets, Manage Offsets for Fully-Managed Connectors in Confluent Cloud
Transforms and Predicates
For details, see the Single Message Transforms (SMT) documentation. For all property values and definitions, see Configuration properties.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
Verify the connection details.
Click Launch.
The status for the connector should go from Provisioning to Running.
Step 5: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Using the Confluent CLI¶
You can use the following steps to set up and run the connector using the Confluent CLI, but first, ensure you have met all prerequisites.
Step 1: List the available connectors¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
Step 3: Create the connector configuration file¶
Create a JSON file that contains the connector configuration properties. The following example shows required and optional connector properties.
{
"config": {
"connector.class": "DynamoDbCdcSource",
"name": "DynamoDbCdcSourceConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"schema.context.name": "default",
"aws.access.key.id": "<my-aws-access-key-id>",
"aws.secret.access.key": "<my-aws-secret-access-key>",
"output.data.format": "JSON",
"dynamodb.service.endpoint": "http://localhost:8080",
"dynamodb.table.discovery.mode": "INCLUDELIST",
"dynamodb.table.sync.mode": "SNAPSHOT_CDC",
"dynamodb.table.includelist": "t1, t2",
"max.batch.size": "1000",
"poll.linger.ms": "5000",
"dynamodb.snapshot.max.poll.records": "1000",
"dynamodb.cdc.checkpointing.table.prefix": "connect-KCL-",
"dynamodb.cdc.table.billing.mode": "PROVISIONED",
"dynamodb.cdc.max.poll.records": "5000",
"tasks.max": "1"
}
}
For all property values and definitions, see Configuration properties.
Single Message Transforms: For details about adding SMTs using the Confluent CLI, see the Single Message Transformation (SMT) documentation. For a list of SMTs that are not supported with this connector, see Unsupported transformations.
Step 4: Load the configuration file and create the connector¶
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file dynamodb-cdc-source-config.json
Example output:
Created connector DynamoDbCdcSourceConnector_0 lcc-ix4dl
Step 5: Check the connector status¶
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type
+-----------+-----------------------------+---------+------+
lcc-ix4dl | DynamoDbCdcSourceConnector_0| RUNNING | source
Step 6: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Configuration properties¶
Use the following configuration properties with the fully-managed connector.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
Schema Config¶
key.subject.name.strategy
Determines how to construct the subject name under which the key schema is registered with Schema Registry.
- Type: string
- Default: TopicNameStrategy
- Valid Values: RecordNameStrategy, TopicNameStrategy
- Importance: medium
schema.context.name
Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
- Type: string
- Default: default
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry.
- Type: string
- Default: TopicNameStrategy
- Valid Values: RecordNameStrategy, TopicNameStrategy, TopicRecordNameStrategy
- Importance: medium
key.converter.reference.subject.name.strategy
Set the subject reference name strategy for key. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.
- Type: string
- Default: DefaultReferenceSubjectNameStrategy
- Importance: medium
value.converter.reference.subject.name.strategy
Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.
- Type: string
- Default: DefaultReferenceSubjectNameStrategy
- Importance: medium
AWS credentials¶
authentication.method
Select how you want to authenticate with AWS.
- Type: string
- Default: Access Keys
- Importance: high
aws.access.key.id
- Type: password
- Importance: high
provider.integration.id
Select an existing integration that has access to your resource. In case you need to integrate a new IAM role, use provider integration
- Type: string
- Importance: high
aws.secret.access.key
- Type: password
- Importance: high
Output messages¶
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- Type: string
- Default: AVRO
- Importance: high
output.data.key.format
Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Default: AVRO
- Valid Values: AVRO, JSON_SR, PROTOBUF
- Importance: high
DynamoDB Details¶
dynamodb.service.endpoint
AWS DynamoDB API Endpoint
- Type: string
- Importance: high
dynamodb.table.sync.mode
Define table sync mode. SNAPSHOT_CDC - start with SNAPSHOT and switch to CDC mode on completion; SNAPSHOT - perform SNAPSHOT only,CDC - perform CDC only
- Type: string
- Default: SNAPSHOT_CDC
- Importance: high
dynamodb.table.discovery.mode
Specifies the table discovery mode that defines the list of tables to be captured.
TAG - use resource tag discovery using AWS Resource Tagging endpoint.
INCLUDELIST- use explicit table names to be included- Type: string
- Default: INCLUDELIST
- Importance: high
aws.resource.tagging.endpoint
AWS Resource Group Tag API Endpoint. Required if dynamodb.table.discovery.mode is selected as TAG
- Type: string
- Importance: high
dynamodb.table.tag.filters
A semi-colon-separated list of pairs in the form <tag-key>:<value-1>,<value-2> that is used to create tag filters. For example, key1:v1,v2;key2:v3,v4 will include all tags that match key1 key with value of either v1 or v2, and match key2 with value of either v3 or v4. Any keys not specified will be excluded.
- Type: string
- Importance: high
dynamodb.table.includelist
A comma-separated list of DynamoDB table names to be captured. This is required if dynamodb.table.discovery.mode is set to INCLUDELIST
- Type: string
- Importance: high
max.batch.size
The maximum number of records that will be returned by the connector to Connect. The connector may still return fewer records if no additional records are available.
- Type: int
- Default: 1000
- Valid Values: [100,…,10000]
- Importance: medium
poll.linger.ms
The maximum time to wait for a record before returning an empty batch. The call to poll can return early before
poll.linger.ms
expires ifmax.batch.size
records are received.- Type: long
- Default: 5000 (5 seconds)
- Valid Values: [0,…,300000]
- Importance: medium
Snapshot Details¶
dynamodb.snapshot.max.poll.records
Maximum number of records that can be returned in single DynamoDB read operation. Only applicable to SNAPSHOT phase. Note that there is 1MB size limit as well.
- Type: int
- Default: 1000
- Importance: medium
dynamodb.snapshot.max.rcu.percentage
Configure percentage of table read capacity that will be used as a maximum limit of RCU consumption rate
- Type: int
- Default: 50
- Importance: medium
CDC Details¶
dynamodb.cdc.initial.stream.position
Specifies the position in the stream where a new application should start from. This is used during initial application bootstrap (when a checkpoint doesn’t exist for a shard or its parents). Used only in CDC mode.TRIM_HORIZON - Start from the oldest available data record. LATEST - Start after the most recent data record (fetch new data).AT_TIMESTAMP - Start from the record at or after the specified server-side timestamp
- Type: string
- Importance: medium
cdc.start.position.timestamp
Specifies the timestamp in the stream where a new application should start from. This is used during initial application bootstrap (when a checkpoint doesn’t exist for a shard or its parents). Used only in CDC mode.
- Type: string
- Importance: medium
cdc.start.position.timestamp.format
The format of timestamp in the stream where a new application should start fromThe format should abide by patterns specified in java.time.format.DateTimeFormatter https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#patterns. This is used during initial application bootstrap (when a checkpoint doesn’t exist for a shard or its parents). Used only in CDC mode.
- Type: string
- Default: yyyy-MM-dd’T’HH:mm:ss’Z’
- Importance: medium
dynamodb.cdc.checkpointing.table.prefix
Prefix for CDC Checkpointing tables, must be unique per connector. Checkpointing table is used to store the last processed record for each shard and is used to resume from lastprocessed record in case of connector restart. This is applicable only in CDC mode.
- Type: string
- Default: connect-KCL-
- Importance: medium
dynamodb.cdc.table.billing.mode
Define billing mode for internal checkpoint table created with CDC. Allowed values: PROVISIONED, PAY_PER_REQUEST.Default is PROVISIONED. Use PAY_PER_REQUEST for unpredictable application traffic and on-demand billing mode. Use PROVISIONED for predictable application traffic and provisioned billing mode.
- Type: string
- Default: PROVISIONED
- Importance: medium
dynamodb.cdc.max.poll.records
Maximum number of records that can be returned in single DynamoDB Stream getRecords
- Type: int
- Default: 5000
- Importance: medium
dynamodb.cdc.checkpointing.table.read.capacity
Read capacity for CDC checkpointing table. The checkpointing table is used to track the leases (shards) of the DynamoDB tables and helps determine the resume position in case of connector restarts.
- Type: int
- Default: 50
- Importance: low
dynamodb.cdc.checkpointing.table.write.capacity
Write capacity for CDC checkpointing table. The checkpointing table is used to track the leases (shards) of the DynamoDB tables and helps determine the resume position in case of connector restarts.
- Type: int
- Default: 50
- Importance: low
Number of tasks for this connector¶
tasks.max
Maximum number of tasks for the connector.
- Type: int
- Valid Values: [1,…]
- Importance: high
Next steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.
Try Confluent Cloud on AWS Marketplace with $1000 of free usage for 30 days, and pay as you go. No credit card is required.