Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Kafka Connect AWS DynamoDB Sink Connector

The Kafka Connect DynamoDB Sink Connector is used to export messages from Kafka to AWS DynamoDB, allowing you to export your Kafka data into your DynamoDB key-value and document database.

The connector periodically polls data from Kafka and writes it to DynamoDB. The data from each Kafka topic is batched and sent to DynamoDB. Due to constraints from DynamoDB, each batch can only contain one change per Key and each failure in a batch must be handled before the next batch is processed to ensure the exactly once guarantees. When a table doesn’t exist, the DynamoDB Sink connector creates the table dynamically depending on configuration and permissions.

Features

The DynamoDB connector offers a variety of features:

  • Exactly Once Delivery: The DynamoDB Sink Connector guarantees exactly once delivery using its internal retry policy on a per batch basis and DynamoDB’s natural deduplication of messages as long as ordering is guaranteed. However, this requires that the primary key used by the connector to be located on a single Kafka partition. Also, an override with the same data should not trigger a change in DynamoDB Streams.
  • Type Conversion: The connector dynamically converts basic types and complex structures into the equivalent DynamoDB types and structures.
  • Record Id: The connector allows the specification of a primary key based on the record’s Kafka fields. Look at the configurations options for more information.

Install AWS DynamoDB Sink Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-aws-dynamodb:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-aws-dynamodb:1.0.0-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Quick Start

This quick start uses the DynamoDB connector to export data produced by the Avro console producer to DynamoDB.

Before you begin, you must create the user or IAM role running the connector with write and create access to DynamoDB.

  1. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-aws-dynamodb:latest
    

    Tip

    By default, the plugin is installed in share/confluent-hub-components and added to the directory of the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. See AWS DynamoDB CLI for details about setting up and using the CLI.

  2. Start the services using the Confluent CLI.

    confluent start
    

    Every service starts in order, printing a message with its status.

    Starting zookeeper
    zookeeper is [UP]
    Starting kafka
    kafka is [UP]
    Starting schema-registry
    schema-registry is [UP]
    Starting kafka-rest
    kafka-rest is [UP]
    Starting connect
    connect is [UP]
    Starting ksql-server
    ksql-server is [UP]
    Starting control-center
    control-center is [UP]
    

    Note

    You need to make sure the connector user has write access to DynamoDB and has deployed credentials appropriately. You can also pass additional properties to the credentials provider. For details, refer to S3 Connector Credentials.

  3. Start the Avro console producer to import a few records with a simple schema in Kafka. Use the following command:

      ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic dynamodb_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    
  4. Enter the following in the console producer:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

    The records are published to the Kafka topic dynamodb_topic in Avro format.

  5. Find the region that the DynamoDB instance is running in (for example, us-east-2) and create a config file with the following contents. Save it as quickstart-dynamodb.properties.

    Note

    In the following example, a DynamoDB table called dynamodb_topic will be created in your DynamoDB instance.

    name=dynamodb-sink
    connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector
    tasks.max=1
    topics=dynamodb_topic
    
    # use the region to populate the next two properties
    aws.dynamodb.region=<region>
    aws.dynamodb.endpoint=https://dynamodb.<region>.amazonaws.com
    
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    
  6. Start the DynamoDB connector by loading its configuration with the following command:

    confluent load dynamodb-sink​ -d quickstart-dynamodb.properties
    
    {
       "name": "dynamodb-sink",
       "config": {
          "connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
          "tasks.max": "1",
          "topics": "dynamodb_topic",
          "aws.dynamodb.region": "<region>",
          "aws.dynamodb.endpoint": "https://dynamodb.<region>.amazonaws.com",
          "confluent.topic.bootstrap.servers": "localhost:9092",
          "confluent.topic.replication.factor": "1",
          "name": "dynamodb-sink"
       },
       "tasks": [],
       "type": "sink"
    }
    

    Important

    Don’t use the Confluent CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent status dynamodb-sink
    
  8. After the connector has ingested some records, use the AWS CLI to check that the data is available in DynamoDB.

    aws dynamodb scan --table-name dynamodb_topic --region us-east-1
    

    You should see nine items with keys:

    {
        "Items": [
            {
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "0"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value1"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "1"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value2"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "2"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value3"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "3"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value4"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "4"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value5"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "5"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value6"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "6"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value7"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "7"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value8"
                }
            },{
                "partition": {
                    "N": "0"
                },"offset": {
                    "N": "8"
                },"name": {
                    "S": "f1"
                },"type": {
                    "S": "value9"
                }
            }
        ],
        "Count": 9,
        "ScannedCount": 9,
        "ConsumedCapacity": null
    }
    
  9. Enter the following command to stop the Connect worker and all services:

    confluent stop
    

    Your output should resemble:

    Stopping control-center
    control-center is [DOWN]
    Stopping ksql-server
    ksql-server is [DOWN]
    Stopping connect
    connect is [DOWN]
    Stopping kafka-rest
    kafka-rest is [DOWN]
    Stopping schema-registry
    schema-registry is [DOWN]
    Stopping kafka
    kafka is [DOWN]
    Stopping zookeeper
    zookeeper is [DOWN]
    

    Or, enter the following command to stop all services and delete all generated data:

    confluent destroy
    

    Your output should resemble:

    Stopping control-center
    control-center is [DOWN]
    Stopping ksql-server
    ksql-server is [DOWN]
    Stopping connect
    connect is [DOWN]
    Stopping kafka-rest
    kafka-rest is [DOWN]
    Stopping schema-registry
    schema-registry is [DOWN]
    Stopping kafka
    kafka is [DOWN]
    Stopping zookeeper
    zookeeper is [DOWN]
    Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
    

Additional Documentation