.. _connect_dynamodb: |aws| DynamoDB Sink Connector ============================= .. _connect_dynamodb_introduction: The |kconnect-long| DynamoDB Sink Connector is used to export messages from |ak-tm| to |aws| DynamoDB, allowing you to export your |ak| data into your DynamoDB key-value and document database. The connector periodically polls data from |ak| and writes it to DynamoDB. The data from each |ak| topic is batched and sent to DynamoDB. Due to constraints from DynamoDB, each batch can only contain one change per Key and each failure in a batch must be handled before the next batch is processed to ensure the exactly once guarantees. When a table doesn't exist, the DynamoDB Sink connector creates the table dynamically depending on configuration and permissions. .. _connect_dynamodb_features: Features -------- The DynamoDB connector offers a variety of features: * **Exactly Once Delivery**: The DynamoDB Sink Connector guarantees exactly once delivery using its internal retry policy on a per batch basis and DynamoDB's natural deduplication of messages as long as ordering is guaranteed. However, this requires that the primary key used by the connector to be located on a single |ak| partition. Also, an override with the same data should not trigger a change in DynamoDB Streams. * **Type Conversion**: The connector dynamically converts basic types and complex structures into the equivalent DynamoDB types and structures. * **Record Id**: The connector allows the specification of a primary key based on the record's |ak| fields. Look at the :ref:`configurations options ` for more information. .. _connect_dynamodb__install: Install the |aws| DynamoDB Sink Connector ----------------------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-aws-dynamodb:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-aws-dynamodb:1.0.0-preview -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `_ for your connector and then follow the manual connector installation :ref:`instructions `. .. _connect_dynamodb_license: License ------- .. include:: ../includes/enterprise-license.rst See :ref:`dynamodb-sink-connector-license-config` for license properties and :ref:`dynamodb-sink-license-topic-configuration` for information about the license topic. .. _connect_dynamodb_quickstart: Quick Start ----------- This quick start uses the DynamoDB connector to export data produced by the Avro console producer to DynamoDB. Before you begin, you must create the user or IAM role running the connector with `write and create access `_ to DynamoDB. #. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your CP installation directory confluent-hub install confluentinc/kafka-connect-aws-dynamodb:latest .. tip:: By default, the plugin is installed in ``share/confluent-hub-components`` and added to the directory of the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. See `AWS DynamoDB CLI `_ for details about setting up and using the CLI. #. Start the services using the Confluent CLI. .. codewithvars:: bash |confluent_start| Every service starts in order, printing a message with its status. .. include:: ../../includes/cli.rst :start-after: CE_CLI_startup_output :end-before: COS_CP_CLI_startup_output .. note:: You need to make sure the connector user has write access to DynamoDB and has deployed credentials `appropriately `_. You can also pass additional properties to the credentials provider. For details, refer to :ref:`configurable-credentials`. #. Start the Avro console producer to import a few records with a simple schema in |ak|. Use the following command: .. sourcecode:: bash ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic dynamodb_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' #. Enter the following in the console producer: .. sourcecode:: bash {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"} The records are published to the Kafka topic ``dynamodb_topic`` in Avro format. #. Find the region that the DynamoDB instance is running in (for example, ``us-east-2``) and create a config file with the following contents. Save it as ``quickstart-dynamodb.properties``. .. note:: In the following example, a DynamoDB table called ``dynamodb_topic`` will be created in your DynamoDB instance. :: name=dynamodb-sink connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector tasks.max=1 topics=dynamodb_topic # use the region to populate the next two properties aws.dynamodb.region= aws.dynamodb.endpoint=https://dynamodb..amazonaws.com confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 #. Start the DynamoDB connector by loading its configuration with the following command: .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| dynamodb-sink|dash| -d quickstart-dynamodb.properties { "name": "dynamodb-sink", "config": { "connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector", "tasks.max": "1", "topics": "dynamodb_topic", "aws.dynamodb.region": "", "aws.dynamodb.endpoint": "https://dynamodb..amazonaws.com", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "name": "dynamodb-sink" }, "tasks": [], "type": "sink" } .. important:: Don't use the :ref:`Confluent CLI ` commands in production environments. #. Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| dynamodb-sink #. After the connector has ingested some records, use the AWS CLI to check that the data is available in DynamoDB. .. sourcecode:: bash aws dynamodb scan --table-name dynamodb_topic --region us-east-1 You should see nine items with keys: .. sourcecode:: bash { "Items": [ { "partition": { "N": "0" },"offset": { "N": "0" },"name": { "S": "f1" },"type": { "S": "value1" } },{ "partition": { "N": "0" },"offset": { "N": "1" },"name": { "S": "f1" },"type": { "S": "value2" } },{ "partition": { "N": "0" },"offset": { "N": "2" },"name": { "S": "f1" },"type": { "S": "value3" } },{ "partition": { "N": "0" },"offset": { "N": "3" },"name": { "S": "f1" },"type": { "S": "value4" } },{ "partition": { "N": "0" },"offset": { "N": "4" },"name": { "S": "f1" },"type": { "S": "value5" } },{ "partition": { "N": "0" },"offset": { "N": "5" },"name": { "S": "f1" },"type": { "S": "value6" } },{ "partition": { "N": "0" },"offset": { "N": "6" },"name": { "S": "f1" },"type": { "S": "value7" } },{ "partition": { "N": "0" },"offset": { "N": "7" },"name": { "S": "f1" },"type": { "S": "value8" } },{ "partition": { "N": "0" },"offset": { "N": "8" },"name": { "S": "f1" },"type": { "S": "value9" } } ], "Count": 9, "ScannedCount": 9, "ConsumedCapacity": null } #. Enter the following command to stop the Connect worker and all services: .. codewithvars:: bash |confluent_stop| Your output should resemble: .. include:: ../../includes/cli.rst :start-after: ce_cli_stop_output_start :end-before: ce_cli_stop_output_stop Or, enter the following command to stop all services and delete all generated data: .. codewithvars:: bash |confluent_destroy| Your output should resemble: .. include:: ../../includes/cli.rst :start-after: ce_cli_stop_destroy_output_start :end-before: ce_cli_stop_destroy_output_stop Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options