Amazon DynamoDB Sink Connector¶
The Kafka Connect DynamoDB Sink Connector is used to export messages from Apache Kafka® to Amazon DynamoDB, allowing you to export your Kafka data into your DynamoDB key-value and document database.
The connector periodically polls data from Kafka and writes it to DynamoDB. The data from each Kafka topic is batched and sent to DynamoDB. Due to constraints from DynamoDB, each batch can only contain one change per Key and each failure in a batch must be handled before the next batch is processed to ensure the exactly once guarantees. When a table doesn’t exist, the DynamoDB Sink connector creates the table dynamically depending on configuration and permissions.
Features¶
The DynamoDB Sink connector for Confluent Platform includes the following features:
Exactly once delivery¶
The DynamoDB Sink connector guarantees exactly once delivery using its internal retry policy on a per batch basis and DynamoDB’s natural deduplication of messages as long as ordering is guaranteed. However, this requires that the primary key used by the connector to be located on a single Kafka partition. Also, an override with the same data should not trigger a change in DynamoDB Streams.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The DynamoDB Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to huge performance gains when multiple files need to be parsed.
Type conversion¶
The connector dynamically converts basic types and complex structures into the equivalent DynamoDB types and structures.
Record Id¶
The connector allows the specification of a primary key based on the record’s Kafka fields. Look at the configurations options for more information.
Limitations¶
The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
Install the Amazon DynamoDB Sink Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An install of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-aws-dynamodb:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-aws-dynamodb:1.0.0-previeww
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Amazon DynamoDB Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
AWS Credentials¶
The following sections provide information about how to configure an AWS DynamoDb connector to provide credentials when connecting to AWS.
Credentials provider chain¶
By default, the AWS DynamoDb connector looks for credentials in the following locations and in the following order:
The
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You can use theexport
command to set these variables.export AWS_ACCESS_KEY_ID=<your_access_key_id> export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
The
AWS_ACCESS_KEY
andAWS_SECRET_KEY
environment variables can be used instead, but are not recognized by the AWS CLI.The
aws.accessKeyId
andaws.secretKey
Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.The
~/.aws/credentials
file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:aws configure
You can also manually create the credentials file using a text editor. The file should contain lines in the format shown in the following example. See AWS Credentials File Format for more details.
[default] aws_access_key_id = <your_access_key_id> aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, ensure the user creating the credentials file is the same user that’s running the Connect worker processes and that the credentials file is in the user’s home directory. Otherwise, the AWS DynamoDb connector will not be able to find the credentials.
A query sent to
http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI}
to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.
Choose one of the methods above to define the AWS credentials that the AWS DynamoDb connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.
Note
Confluent recommends using either Environment variables or a Credentials file because they are the most straightforward, and can be checked using the AWS CLI tool before running the connector.
All AWS DynamoDb connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, refer to the following section to learn more about controlling and customizing how the AWS DynamoDb connector gets AWS credentials.
Caution
If you configure one of the AWS key and AWS secret key implementations (as mentioned above), credentials cannot be supplied through the following credentials providers or by using the Trusted Account Credentials implementation. Trying to provide credentials using many implementations will result in an authentication failure.
Credentials providers¶
A credentials provider is a Java class that implements the com.amazon.auth.AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials from the environment. By default the AWS DynamoDb connector configuration property aws.dynamodb.credentials.provider.class
uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class. This class and interface implementation chains together five other credential provider classes.
The com.amazonaws.auth.DefaultAWSCredentialsProviderChain implementation looks for credentials in the following order:
Environment variables using the com.amazonaws.auth.EnvironmentVariableCredentialsProvider class implementation. This implementation uses environment variables
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
. Environment variablesAWS_ACCESS_KEY
andAWS_SECRET_KEY
are also supported by this implementation; however, these two variables are only recognized by the AWS SDK for Java and are not recommended.Java system properties using the com.amazonaws.auth.SystemPropertiesCredentialsProvider class implementation. This implementation uses Java system properties
aws.accessKeyId
andaws.secretKey
.Credentials file using the com.amazonaws.auth.profile.ProfileCredentialsProvider class implementation. This implementation uses a credentials file located in the path
~/.aws/credentials
. This credentials provider can be used by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:aws configure
You can also manually create the credentials file using a text editor. The file should contain lines in the format shown in the following example. See AWS Credentials File Format for more details.
[default] aws_access_key_id = <your_access_key_id> aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the AWS DynamoDb connector will not be able to find the credentials.
Amazon Elastic Container Service (ECS) container credentials using the com.amazonaws.auth.ContainerCredentialsProvider class implementation. This implementation uses a query sent to
http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI}
to return AWS credentials for the AWS DynamoDb connector. For this provider to work, the environment variableAWS_CONTAINER_CREDENTIALS_RELATIVE_URI
must be set. See IAM Roles for Tasks for additional information about setting up this query.EC2 instance profile credentials using the com.amazonaws.auth.InstanceProfileCredentialsProvider class implementation. EC2 instance metadata is queried for credentials. See Amazon EC2 metadata service for additional information about instance metadata queries. See Working with AWS credentials for additional information and updates from AWS.
Note
EC2 instance profile credentials can be used only if the environment variable
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
is not set. See com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper for more information.
Using Trusted Account Credentials¶
This connector can assume a role and use credentials from a separate trusted account. This is a default feature provided with recent versions of this connector that include an updated version of the AWS SDK.
Important
You cannot use assumed role credentials to access AWS through a proxy server without first passing environment variables or system properties. This is due to an AWS SDK limitation.
After you create the trust relationship, an IAM user or an application from the trusted account can
use the AWS Security Token Service (AWS STS)
AssumeRole
API operation. This operation provides temporary security credentials that enable
access to AWS resources for the connector. For details, see
Creating a Role to Delegate Permissions to an IAM User.
- Example:
Profile in ~/.aws/credentials: [default] role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role source_profile=staging role_session_name = OPTIONAL_SESSION_NAME [staging] aws_access_key_id = <STAGING KEY> aws_secret_access_key = <STAGING SECRET>
To allow the connector to assume a role with the right permissions, set the
Amazon Resource Name (ARN)
for this role. Additionally, you must choose between source_profile
or credential_source
as the way to get credentials that have permission to assume the role, in the environment where the
connector is running.
Note
When setting up trusted account credentials, be aware that the approach of loading profiles from
both ~/.aws/credentials
and ~/.aws/config
does not work when configuring this connector.
Assumed role settings and credentials must be placed in the ~/.aws/credentials
file.
Additionally, the DynamoDb Sink connector implements the AwsAssumeRoleCredentialsProvider
which means
you can use the following properties to configure the assume role operation.
aws.dynamodb.credentials.provider.class=io.confluent.connect.aws.dynamodb.auth.AwsAssumeRoleCredentialsProvider
aws.dynamodb.credentials.provider.sts.role.arn=arn:aws:iam::012345678901:role/my-restricted-role
aws.dynamodb.credentials.provider.sts.role.session.name=session-name
aws.dynamodb.credentials.provider.sts.role.external.id=external-id
Using Other Implementations¶
You can use a different credentials provider. To do this, set the aws.dynamodb.credentials.provider.class
property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider interface.
Complete the following steps to use a different credentials provider:
Find or create a Java credentials provider class that implements the com.amazon.auth.AWSCredentialsProvider interface.
Put the class file in a JAR file.
Place the JAR file in the
share/java/kafka-connect-amazon-dynamodb
directory on all Connect workers.Restart the Connect workers.
Change the AWS DynamoDb connector property file to use your custom credentials. Add the provider class entry
aws.dynamodb.credentials.provider.class=<className>
in the AWS DynamoDb connector properties file.Important
You must use the fully-qualified class name in the
<className>
entry.
Quick Start¶
This quick start uses the DynamoDB connector to export data produced by the Avro console producer to DynamoDB.
Before you begin, you must create the user or IAM role running the connector with write and create access to DynamoDB.
Install the connector through the Confluent Hub Client.
# run from your CP installation directory confluent connect plugin install confluentinc/kafka-connect-aws-dynamodb:latest
Tip
By default, the connector will install the plugin into the
share/confluent-hub-components
directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker. For details about setting up and using the CLI, see AWS DynamoDB CLI.Start the services using the Confluent CLI.
confluent local services start
Every service starts in order, printing a message with its status.
Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP] Starting KSQL Server KSQL Server is [UP] Starting Control Center Control Center is [UP]
Note
You must ensure the connector user has write access to DynamoDB and has deployed credentials appropriately. You can also pass more properties to the credentials provider. For details, refer to AWS Credentials.
Start the Avro console producer to import a few records with a simple schema in Kafka. Use the following command:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic dynamodb_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Enter the following in the console producer:
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
The records are published to the Kafka topic
dynamodb_topic
in Avro format.Find the region that the DynamoDB instance is running in (for example,
us-east-2
) and create a config file with the following contents. Save it asquickstart-dynamodb.properties
.Note
In the following example, a DynamoDB table called
dynamodb_topic
will be created in your DynamoDB instance.name=dynamodb-sink connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector tasks.max=1 topics=dynamodb_topic # use the region to populate the next two properties aws.dynamodb.region=<region> aws.dynamodb.endpoint=https://dynamodb.<region>.amazonaws.com confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Start the DynamoDB connector by loading its configuration with the following command:
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load dynamodb-sink --config quickstart-dynamodb.properties { "name": "dynamodb-sink", "config": { "connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector", "tasks.max": "1", "topics": "dynamodb_topic", "aws.dynamodb.region": "<region>", "aws.dynamodb.endpoint": "https://dynamodb.<region>.amazonaws.com", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "name": "dynamodb-sink" }, "tasks": [], "type": "sink" }
Important
Don’t use the Confluent CLI commands in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status dynamodb-sink
After the connector has ingested some records, use the AWS CLI to check that the data is available in DynamoDB.
aws dynamodb scan --table-name dynamodb_topic --region us-east-1
You should see nine items with keys:
{ "Items": [ { "partition": { "N": "0" },"offset": { "N": "0" },"name": { "S": "f1" },"type": { "S": "value1" } },{ "partition": { "N": "0" },"offset": { "N": "1" },"name": { "S": "f1" },"type": { "S": "value2" } },{ "partition": { "N": "0" },"offset": { "N": "2" },"name": { "S": "f1" },"type": { "S": "value3" } },{ "partition": { "N": "0" },"offset": { "N": "3" },"name": { "S": "f1" },"type": { "S": "value4" } },{ "partition": { "N": "0" },"offset": { "N": "4" },"name": { "S": "f1" },"type": { "S": "value5" } },{ "partition": { "N": "0" },"offset": { "N": "5" },"name": { "S": "f1" },"type": { "S": "value6" } },{ "partition": { "N": "0" },"offset": { "N": "6" },"name": { "S": "f1" },"type": { "S": "value7" } },{ "partition": { "N": "0" },"offset": { "N": "7" },"name": { "S": "f1" },"type": { "S": "value8" } },{ "partition": { "N": "0" },"offset": { "N": "8" },"name": { "S": "f1" },"type": { "S": "value9" } } ], "Count": 9, "ScannedCount": 9, "ConsumedCapacity": null }
Enter the following command to stop the Connect worker and all services:
confluent local stop
Your output should resemble:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka REST Kafka REST is [DOWN] Stopping Schema Registry Schema Registry is [DOWN] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN]
Or, enter the following command to stop all services and delete all generated data:
confluent local destroy
Your output should resemble:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka REST Kafka REST is [DOWN] Stopping Schema Registry Schema Registry is [DOWN] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN] Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
Use aws.dynamodb.pk.hash
and aws.dynamodb.pk.sort
¶
The following example will help you when configuring the aws.dynamodb.pk.hash
and
aws.dynamodb.pk.sort
configuration properties:
{
"ordertime": 1511538140542,
"orderid": 3243,
"itemid": "Item_117",
"orderunits": 1.135368875862714,
"address": {
"city": "City_43",
"state": "State_53",
}
}
Given the previous records in a Kafka topic in Avro, the following cases would apply when running the DynamoDB Sink connector:
Case 1: If you run the DynamoDB Sink connector, your table will have two more fields as shown in the following example:
- A field for partition:
"aws.dynamodb.pk.hash":"partition"
- A field for offset:
"aws.dynamodb.pk.sort":"offset"
paritition offset address itemid orderid ordertime orderunits 0 6075 {“city”:{“S”:City_66}, “state”:{“S”:”State_42},…} Item_246 6075 1503153618445 3.0818679447783652 0 6076 {“city”:{“S”:City_38}, “state”:{“S”:”State_49},…} Item_536 6076 1515872966736 1.6264301342871472 0 6077 {“city”:{“S”:City_32}, “state”:{“S”:”State_62},…} Item_997 6077 1515872966736 4.189731783402986 - A field for partition:
Case 2: If you run the DynamoDB Sink connector with
"aws.dynamodb.pk.hash":"value.orderid"
and"aws.dynamodb.pk.sort":""
, your table will look similar to the following example:orderid address itemid ordertime orderunits 2007 {“city”:{“S”:City_69}, “state”:{“S”:”State_19},…} Item_809 1502071602628 8.9866703527786968 2011 {“city”:{“S”:City_32}, “state”:{“S”:”State_11},…} Item_524 1494848995282 2.581428966318308 2012 {“city”:{“S”:City_88}, “state”:{“S”:”State_94},…} Item_169 1491811930181 1.5716303109073455 In this case,
"aws.dynamodb.pk.sort":""
works when no sort key is required.Case 3: If you run the DynamoDB Sink connector with
"aws.dynamodb.pk.hash":"value.orderid"
and"aws.dynamodb.pk.sort":"value.ordertime"
, your table will look similar to the following example:orderid ordertime address itemid orderunits 4520 1519049522647 {“city”:{“S”:City_99}, “state”:{“S”:”State_38},…} Item_650 7.658775648983428 4522 1519049522647 {“city”:{“S”:City_72}, “state”:{“S”:”State_89},…} Item_503 2.1383312466612261 4523 1507101063792 {“city”:{“S”:City_74}, “state”:{“S”:”State_99},…} Item_369 2.1383312466612261 In this case, one of the record fields is used as a sort key.
Specify an alias¶
The Amazon DynamoDB Sink connector also allows you to specify an alias when
configuring the aws.dynamodb.pk.hash
and aws.dynamodb.pk.sort
properties. For example, for aws.dynamodb.pk.sort
, you could provide an
alias similar to the following:
"aws.dynamodb.pk.sort":"value.campaignUUId:myCampaignUUId"
where myCampaignUUId
is the alias name. For more details about the
aws.dynamodb.pk.hash
and aws.dynamodb.pk.sort
properties, see
DynamoDB Parameters.