AWS DynamoDB Sink Connector

The Kafka Connect DynamoDB Sink connector is used to export messages from Apache Kafka® to AWS DynamoDB, allowing you to export your Kafka data into your DynamoDB key-value and document database.

The connector periodically polls data from Kafka and writes it to DynamoDB. The data from each Kafka topic is batched and sent to DynamoDB. Due to constraints from DynamoDB, each batch can only contain one change per Key and each failure in a batch must be handled before the next batch is processed to ensure the exactly once guarantees. When a table doesn’t exist, the DynamoDB Sink connector creates the table dynamically depending on configuration and permissions.

Features

The DynamoDB Sink connector for Confluent Platform includes the following features:

Exactly once delivery

The DynamoDB Sink connector guarantees exactly once delivery using its internal retry policy on a per batch basis and DynamoDB’s natural deduplication of messages as long as ordering is guaranteed. However, this requires that the primary key used by the connector to be located on a single Kafka partition. Also, an override with the same data should not trigger a change in DynamoDB Streams.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The DynamoDB Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Type conversion

The connector dynamically converts basic types and complex structures into the equivalent DynamoDB types and structures.

Record Id

The connector allows the specification of a primary key based on the record’s Kafka fields. Look at the configurations options for more information.

Install the AWS DynamoDB Sink Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Note

You must install the connector on every machine where Connect will run.

  • An installation of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-aws-dynamodb:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-aws-dynamodb:1.0.0-previeww
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see AWS DynamoDB Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

This quick start uses the DynamoDB connector to export data produced by the Avro console producer to DynamoDB.

Before you begin, you must create the user or IAM role running the connector with write and create access to DynamoDB.

  1. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-aws-dynamodb:latest
    

    Tip

    By default, the plugin is installed in share/confluent-hub-components and added to the directory of the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. See AWS DynamoDB CLI for details about setting up and using the CLI.

  2. Start the services using the Confluent CLI.

    confluent local services start
    

    Every service starts in order, printing a message with its status.

    Starting Zookeeper
    Zookeeper is [UP]
    Starting Kafka
    Kafka is [UP]
    Starting Schema Registry
    Schema Registry is [UP]
    Starting Kafka REST
    Kafka REST is [UP]
    Starting Connect
    Connect is [UP]
    Starting KSQL Server
    KSQL Server is [UP]
    Starting Control Center
    Control Center is [UP]
    

    Note

    You must ensure the connector user has write access to DynamoDB and has deployed credentials appropriately. You can also pass more properties to the credentials provider. For details, refer to AWS Credentials.

  3. Start the Avro console producer to import a few records with a simple schema in Kafka. Use the following command:

      ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic dynamodb_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    
  4. Enter the following in the console producer:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

    The records are published to the Kafka topic dynamodb_topic in Avro format.

  5. Find the region that the DynamoDB instance is running in (for example, us-east-2) and create a config file with the following contents. Save it as quickstart-dynamodb.properties.

    Note

    In the following example, a DynamoDB table called dynamodb_topic will be created in your DynamoDB instance.

    name=dynamodb-sink
    connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector
    tasks.max=1
    topics=dynamodb_topic
    
    # use the region to populate the next two properties
    aws.dynamodb.region=<region>
    aws.dynamodb.endpoint=https://dynamodb.<region>.amazonaws.com
    
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    
  6. Start the DynamoDB connector by loading its configuration with the following command:

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load dynamodb-sink --config quickstart-dynamodb.properties
    
    {
       "name": "dynamodb-sink",
       "config": {
          "connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
          "tasks.max": "1",
          "topics": "dynamodb_topic",
          "aws.dynamodb.region": "<region>",
          "aws.dynamodb.endpoint": "https://dynamodb.<region>.amazonaws.com",
          "confluent.topic.bootstrap.servers": "localhost:9092",
          "confluent.topic.replication.factor": "1",
          "name": "dynamodb-sink"
       },
       "tasks": [],
       "type": "sink"
    }
    

    Important

    Don’t use the Confluent CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status dynamodb-sink
    
  8. After the connector has ingested some records, use the AWS CLI to check that the data is available in DynamoDB.

    aws dynamodb scan --table-name dynamodb_topic --region us-east-1
    

    You should see nine items with keys:

    {
        "Items": [
            {
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "0"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value1"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "1"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value2"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "2"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value3"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "3"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value4"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "4"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value5"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "5"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value6"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "6"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value7"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "7"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value8"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "8"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value9"
                }
            }
        ],
        "Count": 9,
        "ScannedCount": 9,
        "ConsumedCapacity": null
    }
    
  9. Enter the following command to stop the Connect worker and all services:

    confluent local stop
    

    Your output should resemble:

    Stopping Control Center
    Control Center is [DOWN]
    Stopping KSQL Server
    KSQL Server is [DOWN]
    Stopping Connect
    Connect is [DOWN]
    Stopping Kafka REST
    Kafka REST is [DOWN]
    Stopping Schema Registry
    Schema Registry is [DOWN]
    Stopping Kafka
    Kafka is [DOWN]
    Stopping Zookeeper
    Zookeeper is [DOWN]
    

    Or, enter the following command to stop all services and delete all generated data:

    confluent local destroy
    

    Your output should resemble:

    Stopping Control Center
    Control Center is [DOWN]
    Stopping KSQL Server
    KSQL Server is [DOWN]
    Stopping Connect
    Connect is [DOWN]
    Stopping Kafka REST
    Kafka REST is [DOWN]
    Stopping Schema Registry
    Schema Registry is [DOWN]
    Stopping Kafka
    Kafka is [DOWN]
    Stopping Zookeeper
    Zookeeper is [DOWN]
    Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
    

Using aws.dynamodb.pk.hash and aws.dynamodb.pk.sort

The following example will help when using the aws.dynamodb.pk.hash and aws.dynamodb.pk.sort configuration properties:

{
     "ordertime": 1511538140542,
     "orderid": 3243,
     "itemid": "Item_117",
     "orderunits": 1.135368875862714,
     "address": {
       "city": "City_43",
       "state": "State_53",
     }
}

Given the previous records in a Kafka topic in Avro, the following cases would apply when running the DynamoDB Sink connector:

  • Case 1: If you run the DynamoDB Sink connector, your table will have two more fields as shown in the following example:

    • A field for partition: "aws.dynamodb.pk.hash":"partition"
    • A field for offset: "aws.dynamodb.pk.sort":"offset"
    paritition offset address itemid orderid ordertime orderunits
    0 6075 {“city”:{“S”:City_66}, “state”:{“S”:”State_42},…} Item_246 6075 1503153618445 3.0818679447783652
    0 6076 {“city”:{“S”:City_38}, “state”:{“S”:”State_49},…} Item_536 6076 1515872966736 1.6264301342871472
    0 6077 {“city”:{“S”:City_32}, “state”:{“S”:”State_62},…} Item_997 6077 1515872966736 4.189731783402986
  • Case 2: If you run the DynamoDB Sink connector with "aws.dynamodb.pk.hash":"value.orderid" and "aws.dynamodb.pk.sort":"", your table will look similar to the following example:

    orderid address itemid ordertime orderunits
    2007 {“city”:{“S”:City_69}, “state”:{“S”:”State_19},…} Item_809 1502071602628 8.9866703527786968
    2011 {“city”:{“S”:City_32}, “state”:{“S”:”State_11},…} Item_524 1494848995282 2.581428966318308
    2012 {“city”:{“S”:City_88}, “state”:{“S”:”State_94},…} Item_169 1491811930181 1.5716303109073455

    In this case, "aws.dynamodb.pk.sort":"" works when no sort key is required.

  • Case 3: If you run the DynamoDB Sink connector with "aws.dynamodb.pk.hash":"value.orderid" and "aws.dynamodb.pk.sort":"value.ordertime", your table will look similar to the following example:

    orderid ordertime address itemid orderunits
    4520 1519049522647 {“city”:{“S”:City_99}, “state”:{“S”:”State_38},…} Item_650 7.658775648983428
    4522 1519049522647 {“city”:{“S”:City_72}, “state”:{“S”:”State_89},…} Item_503 2.1383312466612261
    4523 1507101063792 {“city”:{“S”:City_74}, “state”:{“S”:”State_99},…} Item_369 2.1383312466612261

    In this case, one of the record fields is used as a sort key.