Amazon DynamoDB Sink Connector

The Kafka Connect DynamoDB Sink Connector is used to export messages from Apache Kafka® to Amazon DynamoDB, allowing you to export your Kafka data into your DynamoDB key-value and document database.

The connector periodically polls data from Kafka and writes it to DynamoDB. The data from each Kafka topic is batched and sent to DynamoDB. Due to constraints from DynamoDB, each batch can only contain one change per Key and each failure in a batch must be handled before the next batch is processed to ensure the exactly once guarantees. When a table doesn’t exist, the DynamoDB Sink connector creates the table dynamically depending on configuration and permissions.

Features

The DynamoDB Sink connector for Confluent Platform includes the following features:

Exactly once delivery

The DynamoDB Sink connector guarantees exactly once delivery using its internal retry policy on a per batch basis and DynamoDB’s natural deduplication of messages as long as ordering is guaranteed. However, this requires that the primary key used by the connector to be located on a single Kafka partition. Also, an override with the same data should not trigger a change in DynamoDB Streams.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The DynamoDB Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Type conversion

The connector dynamically converts basic types and complex structures into the equivalent DynamoDB types and structures.

Record Id

The connector allows the specification of a primary key based on the record’s Kafka fields. Look at the configurations options for more information.

Limitations

The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

  • io.debezium.transforms.ByLogicalTableRouter
  • io.debezium.transforms.outbox.EventRouter
  • org.apache.kafka.connect.transforms.RegexRouter
  • org.apache.kafka.connect.transforms.TimestampRouter
  • io.confluent.connect.transforms.MessageTimestampRouter
  • io.confluent.connect.transforms.ExtractTopic$Key
  • io.confluent.connect.transforms.ExtractTopic$Value

Install the Amazon DynamoDB Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An install of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-aws-dynamodb:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-aws-dynamodb:1.0.0-previeww
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Amazon DynamoDB Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

AWS Credentials

The following sections provide information about how to configure an AWS DynamoDb connector to provide credentials when connecting to AWS.

Credentials provider chain

By default, the AWS DynamoDb connector looks for credentials in the following locations and in the following order:

  1. The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You can use the export command to set these variables.

    export AWS_ACCESS_KEY_ID=<your_access_key_id>
    export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
    

    The AWS_ACCESS_KEY and AWS_SECRET_KEY environment variables can be used instead, but are not recognized by the AWS CLI.

  2. The aws.accessKeyId and aws.secretKey Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.

  3. The ~/.aws/credentials file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the format shown in the following example. See AWS Credentials File Format for more details.

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, ensure the user creating the credentials file is the same user that’s running the Connect worker processes and that the credentials file is in the user’s home directory. Otherwise, the AWS DynamoDb connector will not be able to find the credentials.

  4. A query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials. This is applicable only if the Connect worker processes are running in AWS containers.

  5. A metadata query that returns credentials from an EC2 instance. This is applicable only if the Connect worker processes are running in EC2 instances.

Choose one of the methods above to define the AWS credentials that the AWS DynamoDb connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.

Note

Confluent recommends using either Environment variables or a Credentials file because they are the most straightforward, and can be checked using the AWS CLI tool before running the connector.

All AWS DynamoDb connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, refer to the following section to learn more about controlling and customizing how the AWS DynamoDb connector gets AWS credentials.

Caution

If you configure one of the AWS key and AWS secret key implementations (as mentioned above), credentials cannot be supplied through the following credentials providers or by using the Trusted Account Credentials implementation. Trying to provide credentials using many implementations will result in an authentication failure.

Credentials providers

A credentials provider is a Java class that implements the com.amazon.auth.AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials from the environment. By default the AWS DynamoDb connector configuration property aws.dynamodb.credentials.provider.class uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class. This class and interface implementation chains together five other credential provider classes.

The com.amazonaws.auth.DefaultAWSCredentialsProviderChain implementation looks for credentials in the following order:

  1. Environment variables using the com.amazonaws.auth.EnvironmentVariableCredentialsProvider class implementation. This implementation uses environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Environment variables AWS_ACCESS_KEY and AWS_SECRET_KEY are also supported by this implementation; however, these two variables are only recognized by the AWS SDK for Java and are not recommended.

  2. Java system properties using the com.amazonaws.auth.SystemPropertiesCredentialsProvider class implementation. This implementation uses Java system properties aws.accessKeyId and aws.secretKey.

  3. Credentials file using the com.amazonaws.auth.profile.ProfileCredentialsProvider class implementation. This implementation uses a credentials file located in the path ~/.aws/credentials. This credentials provider can be used by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

    aws configure
    

    You can also manually create the credentials file using a text editor. The file should contain lines in the format shown in the following example. See AWS Credentials File Format for more details.

    [default]
    aws_access_key_id = <your_access_key_id>
    aws_secret_access_key = <your_secret_access_key>
    

    Note

    When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user’s home directory. Otherwise, the AWS DynamoDb connector will not be able to find the credentials.

  4. Amazon Elastic Container Service (ECS) container credentials using the com.amazonaws.auth.ContainerCredentialsProvider class implementation. This implementation uses a query sent to http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} to return AWS credentials for the AWS DynamoDb connector. For this provider to work, the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI must be set. See IAM Roles for Tasks for additional information about setting up this query.

  5. EC2 instance profile credentials using the com.amazonaws.auth.InstanceProfileCredentialsProvider class implementation. EC2 instance metadata is queried for credentials. See Amazon EC2 metadata service for additional information about instance metadata queries. See Working with AWS credentials for additional information and updates from AWS.

    Note

    EC2 instance profile credentials can be used only if the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set. See com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper for more information.

Using Trusted Account Credentials

This connector can assume a role and use credentials from a separate trusted account. This is a default feature provided with recent versions of this connector that include an updated version of the AWS SDK.

Important

You cannot use assumed role credentials to access AWS through a proxy server without first passing environment variables or system properties. This is due to an AWS SDK limitation.

After you create the trust relationship, an IAM user or an application from the trusted account can use the AWS Security Token Service (AWS STS) AssumeRole API operation. This operation provides temporary security credentials that enable access to AWS resources for the connector. For details, see Creating a Role to Delegate Permissions to an IAM User.

Example:
Profile in ~/.aws/credentials:

[default]
role_arn=arn:aws:iam::037803949979:role/kinesis_cross_account_role
source_profile=staging
role_session_name = OPTIONAL_SESSION_NAME

[staging]
aws_access_key_id = <STAGING KEY>
aws_secret_access_key = <STAGING SECRET>

To allow the connector to assume a role with the right permissions, set the Amazon Resource Name (ARN) for this role. Additionally, you must choose between source_profile or credential_source as the way to get credentials that have permission to assume the role, in the environment where the connector is running.

Note

When setting up trusted account credentials, be aware that the approach of loading profiles from both ~/.aws/credentials and ~/.aws/config does not work when configuring this connector. Assumed role settings and credentials must be placed in the ~/.aws/credentials file.

Additionally, the DynamoDb Sink connector implements the AwsAssumeRoleCredentialsProvider which means you can use the following properties to configure the assume role operation.

aws.dynamodb.credentials.provider.class=io.confluent.connect.aws.dynamodb.auth.AwsAssumeRoleCredentialsProvider
aws.dynamodb.credentials.provider.sts.role.arn=arn:aws:iam::012345678901:role/my-restricted-role
aws.dynamodb.credentials.provider.sts.role.session.name=session-name
aws.dynamodb.credentials.provider.sts.role.external.id=external-id

Using Other Implementations

You can use a different credentials provider. To do this, set the aws.dynamodb.credentials.provider.class property to the name of any class that implements the com.amazon.auth.AWSCredentialsProvider interface.

Complete the following steps to use a different credentials provider:

  1. Find or create a Java credentials provider class that implements the com.amazon.auth.AWSCredentialsProvider interface.

  2. Put the class file in a JAR file.

  3. Place the JAR file in the share/java/kafka-connect-amazon-dynamodb directory on all Connect workers.

  4. Restart the Connect workers.

  5. Change the AWS DynamoDb connector property file to use your custom credentials. Add the provider class entry aws.dynamodb.credentials.provider.class=<className> in the AWS DynamoDb connector properties file.

    Important

    You must use the fully-qualified class name in the <className> entry.

Quick Start

This quick start uses the DynamoDB connector to export data produced by the Avro console producer to DynamoDB.

Before you begin, you must create the user or IAM role running the connector with write and create access to DynamoDB.

  1. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent connect plugin install confluentinc/kafka-connect-aws-dynamodb:latest
    

    Tip

    By default, the connector will install the plugin into the share/confluent-hub-components directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker. For details about setting up and using the CLI, see AWS DynamoDB CLI.

  2. Start the services using the Confluent CLI.

    confluent local services start
    

    Every service starts in order, printing a message with its status.

    Starting Zookeeper
    Zookeeper is [UP]
    Starting Kafka
    Kafka is [UP]
    Starting Schema Registry
    Schema Registry is [UP]
    Starting Kafka REST
    Kafka REST is [UP]
    Starting Connect
    Connect is [UP]
    Starting KSQL Server
    KSQL Server is [UP]
    Starting Control Center
    Control Center is [UP]
    

    Note

    You must ensure the connector user has write access to DynamoDB and has deployed credentials appropriately. You can also pass more properties to the credentials provider. For details, refer to AWS Credentials.

  3. Start the Avro console producer to import a few records with a simple schema in Kafka. Use the following command:

      ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic dynamodb_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    
  4. Enter the following in the console producer:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

    The records are published to the Kafka topic dynamodb_topic in Avro format.

  5. Find the region that the DynamoDB instance is running in (for example, us-east-2) and create a config file with the following contents. Save it as quickstart-dynamodb.properties.

    Note

    In the following example, a DynamoDB table called dynamodb_topic will be created in your DynamoDB instance.

    name=dynamodb-sink
    connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector
    tasks.max=1
    topics=dynamodb_topic
    
    # use the region to populate the next two properties
    aws.dynamodb.region=<region>
    aws.dynamodb.endpoint=https://dynamodb.<region>.amazonaws.com
    
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    
  6. Start the DynamoDB connector by loading its configuration with the following command:

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load dynamodb-sink --config quickstart-dynamodb.properties
    
    {
       "name": "dynamodb-sink",
       "config": {
          "connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
          "tasks.max": "1",
          "topics": "dynamodb_topic",
          "aws.dynamodb.region": "<region>",
          "aws.dynamodb.endpoint": "https://dynamodb.<region>.amazonaws.com",
          "confluent.topic.bootstrap.servers": "localhost:9092",
          "confluent.topic.replication.factor": "1",
          "name": "dynamodb-sink"
       },
       "tasks": [],
       "type": "sink"
    }
    

    Important

    Don’t use the Confluent CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status dynamodb-sink
    
  8. After the connector has ingested some records, use the AWS CLI to check that the data is available in DynamoDB.

    aws dynamodb scan --table-name dynamodb_topic --region us-east-1
    

    You should see nine items with keys:

    {
        "Items": [
            {
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "0"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value1"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "1"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value2"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "2"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value3"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "3"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value4"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "4"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value5"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "5"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value6"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "6"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value7"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "7"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value8"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "8"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value9"
                }
            }
        ],
        "Count": 9,
        "ScannedCount": 9,
        "ConsumedCapacity": null
    }
    
  9. Enter the following command to stop the Connect worker and all services:

    confluent local stop
    

    Your output should resemble:

    Stopping Control Center
    Control Center is [DOWN]
    Stopping KSQL Server
    KSQL Server is [DOWN]
    Stopping Connect
    Connect is [DOWN]
    Stopping Kafka REST
    Kafka REST is [DOWN]
    Stopping Schema Registry
    Schema Registry is [DOWN]
    Stopping Kafka
    Kafka is [DOWN]
    Stopping Zookeeper
    Zookeeper is [DOWN]
    

    Or, enter the following command to stop all services and delete all generated data:

    confluent local destroy
    

    Your output should resemble:

    Stopping Control Center
    Control Center is [DOWN]
    Stopping KSQL Server
    KSQL Server is [DOWN]
    Stopping Connect
    Connect is [DOWN]
    Stopping Kafka REST
    Kafka REST is [DOWN]
    Stopping Schema Registry
    Schema Registry is [DOWN]
    Stopping Kafka
    Kafka is [DOWN]
    Stopping Zookeeper
    Zookeeper is [DOWN]
    Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
    

Use aws.dynamodb.pk.hash and aws.dynamodb.pk.sort

The following example will help you when configuring the aws.dynamodb.pk.hash and aws.dynamodb.pk.sort configuration properties:

{
     "ordertime": 1511538140542,
     "orderid": 3243,
     "itemid": "Item_117",
     "orderunits": 1.135368875862714,
     "address": {
       "city": "City_43",
       "state": "State_53",
     }
}

Given the previous records in a Kafka topic in Avro, the following cases would apply when running the DynamoDB Sink connector:

  • Case 1: If you run the DynamoDB Sink connector, your table will have two more fields as shown in the following example:

    • A field for partition: "aws.dynamodb.pk.hash":"partition"
    • A field for offset: "aws.dynamodb.pk.sort":"offset"
    paritition offset address itemid orderid ordertime orderunits
    0 6075 {“city”:{“S”:City_66}, “state”:{“S”:”State_42},…} Item_246 6075 1503153618445 3.0818679447783652
    0 6076 {“city”:{“S”:City_38}, “state”:{“S”:”State_49},…} Item_536 6076 1515872966736 1.6264301342871472
    0 6077 {“city”:{“S”:City_32}, “state”:{“S”:”State_62},…} Item_997 6077 1515872966736 4.189731783402986
  • Case 2: If you run the DynamoDB Sink connector with "aws.dynamodb.pk.hash":"value.orderid" and "aws.dynamodb.pk.sort":"", your table will look similar to the following example:

    orderid address itemid ordertime orderunits
    2007 {“city”:{“S”:City_69}, “state”:{“S”:”State_19},…} Item_809 1502071602628 8.9866703527786968
    2011 {“city”:{“S”:City_32}, “state”:{“S”:”State_11},…} Item_524 1494848995282 2.581428966318308
    2012 {“city”:{“S”:City_88}, “state”:{“S”:”State_94},…} Item_169 1491811930181 1.5716303109073455

    In this case, "aws.dynamodb.pk.sort":"" works when no sort key is required.

  • Case 3: If you run the DynamoDB Sink connector with "aws.dynamodb.pk.hash":"value.orderid" and "aws.dynamodb.pk.sort":"value.ordertime", your table will look similar to the following example:

    orderid ordertime address itemid orderunits
    4520 1519049522647 {“city”:{“S”:City_99}, “state”:{“S”:”State_38},…} Item_650 7.658775648983428
    4522 1519049522647 {“city”:{“S”:City_72}, “state”:{“S”:”State_89},…} Item_503 2.1383312466612261
    4523 1507101063792 {“city”:{“S”:City_74}, “state”:{“S”:”State_99},…} Item_369 2.1383312466612261

    In this case, one of the record fields is used as a sort key.

Specify an alias

The Amazon DynamoDB Sink connector also allows you to specify an alias when configuring the aws.dynamodb.pk.hash and aws.dynamodb.pk.sort properties. For example, for aws.dynamodb.pk.sort, you could provide an alias similar to the following:

"aws.dynamodb.pk.sort":"value.campaignUUId:myCampaignUUId"

where myCampaignUUId is the alias name. For more details about the aws.dynamodb.pk.hash and aws.dynamodb.pk.sort properties, see DynamoDB Parameters.