.. _kinesis-source-connector:

|kconnect-long| Kinesis Source Connector
========================================

The Kinesis Source Connector is used to pull data from Amazon Kinesis and persist the data to an |ak-tm| topic.

Install the Kinesis Connector
-----------------------------

.. include:: ../includes/connector-install.rst

.. include:: ../includes/connector-install-hub.rst

.. codewithvars:: bash

   confluent-hub install confluentinc/kafka-connect-kinesis:latest

.. include:: ../includes/connector-install-version.rst

.. codewithvars:: bash

   confluent-hub install confluentinc/kafka-connect-kinesis:1.1.4

--------------------------
Install Connector Manually
--------------------------

`Download and extract the ZIP file <https://www.confluent.io/connector/kafka-connect-kinesis/#download>`_ for your
connector and then follow the manual connector installation :ref:`instructions <connect_install_connectors>`.

License
-------

.. include:: ../includes/enterprise-license.rst

See :ref:`kinesis-source-connector-license-config` for license properties and :ref:`kinesis-source-license-topic-configuration` for information about the license topic.


Usage Notes
-----------

The default credentials provider is ``DefaultAWSCredentialsProviderChain``. For more information, see the
`AWS documentation <https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html>`_.

.. _kinesis-source-connector-examples:

Examples
--------

.. include:: ../includes/demo-kinesis.rst

----------------------
Property-based example
----------------------


This configuration is used typically along with :ref:`standalone workers <standalone-workers>`.

.. codewithvars:: bash
   :emphasize-lines: 4,5,6,7

    name=KinesisSourceConnector1
    connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
    tasks.max=1
    aws.access.key.id=< Optional Configuration >
    aws.secret.key.id=< Optional Configuration >
    kafka.topic=< Required Configuration >
    kinesis.stream=< Required Configuration >
    kinesis.region=< Optional Configuration - defaults to US_EAST_1 >
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1


------------------
REST-based example
------------------


This configuration is used typically along with :ref:`distributed workers <distributed-workers>`.
Write the following JSON to ``connector.json``, configure all of the required values, and use the command below to
post the configuration to one the distributed connect worker(s). Check here for more information about the
|kconnect-long| :ref:`REST API <connect_userguide_rest>`

**Connect distributed REST-based example:**

.. codewithvars:: bash
   :emphasize-lines: 6,7,8,9

    {
      "config" : {
        "name" : "KinesisSourceConnector1",
        "connector.class" : "io.confluent.connect.kinesis.KinesisSourceConnector",
        "tasks.max" : "1",
        "aws.access.key.id" : "< Optional Configuration >",
        "aws.secret.key.id" : "< Optional Configuration >",
        "kafka.topic" : "< Required Configuration >",
        "kinesis.stream" : "< Required Configuration >"
      }
    }


Use curl to post the configuration to one of the |kconnect-long| Workers. Change `http://localhost:8083/` the endpoint of
one of your |kconnect-long| worker(s).

**Create a new connector:**

.. codewithvars:: bash

    curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

**Update an existing connector:**

.. codewithvars:: bash

    curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/KinesisSourceConnector1/config
    

.. _kinesis_quickstart:

Quick Start
-----------

The Kinesis connector is used to import data from Kinesis streams, and write them into a Kafka topic. Before
you begin, create a Kinesis stream and have a user profile with `read access`_ to it.

-----------------
Preliminary Setup
-----------------

Navigate to your Confluent Platform installation directory and run this
command to install the latest connector version.

::

   confluent-hub install confluentinc/kafka-connect-kinesis:latest

You can install a specific version by replacing latest with a version
number. For example:

::

   confluent-hub install confluentinc/kafka-connect-kinesis:1.1.1-preview

Adding a new connector plugin requires restarting Connect. Use the
Confluent CLI to restart Connect:

::

   $ |confluent_stop| connect && |confluent_start| connect
   Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
   Starting zookeeper
   zookeeper is [UP]
   Starting kafka
   kafka is [UP]
   Starting schema-registry
   schema-registry is [UP]
   Starting kafka-rest
   kafka-rest is [UP]
   Starting connect
   connect is [UP]

Check if the Kinesis plugin has been installed correctly and picked up
by the plugin loader:

::

   $ curl -sS localhost:8083/connector-plugins | jq .[].class | grep kinesis
   "io.confluent.connect.kinesis.KinesisSourceConnector"

-------------
Kinesis Setup
-------------

You can use the AWS Management Console to set up your Kinesis
stream as shown `here`_ or you can complete the following steps:

1. Make sure you have an `AWS account`_.
2. Set up :ref:`awscredentials`.
3. `Create`_ a Kinesis Stream.
   ::

      aws kinesis create-stream --stream-name my_kinesis_stream --shard-count 1

4. `Insert Records`_ into your stream.
   ::

      aws kinesis put-record --stream-name my_kinesis_stream --partition-key 123 --data test-message-1

The example shows that a record containing partition key 123 and data "test-message-1" is inserted into my_kinesis_stream.

------------------------------
Source Connector Configuration
------------------------------

Start the services using the Confluent CLI:

.. codewithvars:: bash

   |confluent_start|

Create a configuration file named kinesis-source-config.json with the following contents.


::
   
   {
     "name": "kinesis-source",
     "config": {
       "connector.class": "io.confluent.connect.kinesis.KinesisSourceConnector",
       "tasks.max": "1",
       "kafka.topic": "kinesis_topic",
       "kinesis.region": "US_WEST_1",
       "kinesis.stream": "my_kinesis_stream",
       "confluent.license": "",
       "name": "kinesis-source",
       "confluent.topic.bootstrap.servers": "localhost:9092",
       "confluent.topic.replication.factor": "1"
     }
   }

The important configuration parameters used here are:

-  **kinesis.stream.name**: The Kinesis Stream to subscribe to.
-  **kafka.topic**: The Kafka topic in which the messages received from Kinesis are produced.
-  **tasks.max**: The maximum number of tasks that should be created for
   this connector. Each Kinesis `shard`_ is allocated to a single
   task. If the number of shards specified exceeds the number of tasks,
   the connector throws an exception and fails.
- **kinesis.region**: The region where the stream exists. Defaults to ``US_EAST_1`` if not specified.
-  You may pass your |aws| credentials to the Kinesis connector through
   your source connector configuration. To pass AWS credentials in the
   source configuration set the **aws.access.key.id** and the **aws.secret.key.id**: parameters.
   ::

      "aws.acess.key.id":<your-access-key>
      "aws.secret.key.id":<your-secret-key>

Run this command to start the Kinesis source connector.

.. include:: ../../includes/confluent-local-consume-limit.rst

.. codewithvars:: bash

   |confluent_load| source-kinesis|dash| -d source-kinesis-config.json


To check that the connector started successfully view the Connect
worker's log by running:

.. codewithvars:: bash

   |confluent_log| connect

Start a Kafka Consumer in a separate terminal session to view the data exported by
the connector into the kafka topic

::

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kinesis_topic --from-beginning

Finally, stop the Confluent services using the command:

.. codewithvars:: bash

   |confluent_stop|

-----------------------
Remove unused resources
-----------------------

`Delete your stream`_ and clean up resources to avoid incurring any
unintended charges.

::

   aws kinesis delete-stream --stream-name my_kinesis_stream

.. _awscredentials:

---------------
AWS Credentials
---------------

By default, the kinesis connector looks for kinesis credentials in the following locations and in the following order:

#. The ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environment variables accessible to the Connect worker processes where the connector will be deployed. These variables are recognized by the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these variables.

   .. sourcecode:: bash

      export AWS_ACCESS_KEY_ID=<your_access_key_id>
      export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>

   The ``AWS_ACCESS_KEY`` and ``AWS_SECRET_KEY`` can be used instead, but are not recognized by the AWS CLI.

#. The ``aws.accessKeyId`` and ``aws.secretKey`` Java system properties on the Connect worker processes where the connector will be deployed. However, these variables are only recognized by the AWS SDK for Java and are not recommended.

#. The ``~/.aws/credentials`` file located in the home directory of the operating system user that runs the Connect worker processes. These credentials are recognized by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

   .. sourcecode:: bash

      aws configure

   You can also manually create the credentials file using a text editor. The file should contain lines in the following format:

   .. sourcecode:: bash

      [default]
      aws_access_key_id = <your_access_key_id>
      aws_secret_access_key = <your_secret_access_key>

   .. note:: 

      When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user's home directory. Otherwise, the kinesis connector will not be able to find the credentials.
      
   See `AWS Credentials File Format <https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-file-format>`_ for additional details.
   

Choose one of the above to define the AWS credentials that the kinesis connectors use, verify the credentials implementation is set correctly, and then restart all of the Connect worker processes.

.. note::

   Confluent recommends using either **Environment variables** or a  **Credentials file** because these are the most straightforward, and they can be checked using the AWS CLI tool before running the connector.

All kinesis connectors run in a single Connect worker cluster and use the same credentials. This is sufficient for many use cases. If you want more control, refer to the following section to learn more about controlling and customizing how the kinesis connector gets AWS credentials.

---------------------
Credentials Providers
---------------------

A *credentials provider* is a Java class that implements the `com.amazon.auth.AWSCredentialsProvider <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html>`__ interface in the AWS Java library and returns AWS credentials from the environment. By default the kinesis connector configuration property ``kinesis.credentials.provider.class``  uses the `com.amazon.auth.DefaultAWSCredentialsProviderChain <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html>`__ class. This class and interface implementation chains together five other credential provider classes.

The `com.amazonaws.auth.DefaultAWSCredentialsProviderChain <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html>`__ implementation looks for credentials in the following order:

#. **Environment variables** using the `com.amazonaws.auth.EnvironmentVariableCredentialsProvider <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/EnvironmentVariableCredentialsProvider.html>`__ class implementation. This implementation uses environment variables ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY``. Environment variables ``AWS_ACCESS_KEY`` and ``AWS_SECRET_KEY`` are also supported by this implementation; however, these two variables are only recognized by the AWS SDK for Java and are not recommended.

#. **Java system properties** using the `com.amazonaws.auth.SystemPropertiesCredentialsProvider <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/SystemPropertiesCredentialsProvider.html>`__ class implementation. This implementation uses Java system properties ``aws.accessKeyId`` and ``aws.secretKey``.
   
#. **Credentials file** using the `com.amazonaws.auth.profile.ProfileCredentialsProvider <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/profile/ProfileCredentialsProvider.html>`__ class implementation. This implementation uses a credentials file located in the path ``~/.aws/credentials``. This credentials provider can be used by most AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:

   .. sourcecode:: bash

      aws configure

   You can also manually create the credentials file using a text editor. The file should contain lines in the following format:

   .. sourcecode:: bash

      [default]
      aws_access_key_id = <your_access_key_id>
      aws_secret_access_key = <your_secret_access_key>

   .. note:: 

      When creating the credentials file, make sure that the user creating the credentials file is the same user that runs the Connect worker processes and that the credentials file is in this user's home directory. Otherwise, the kinesis connector will not be able to find the credentials.
      
   See `AWS Credentials File Format <https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-file-format>`_ for additional details.

---------------------------
Using Other Implementations
---------------------------

You can use a different credentials provider. To do this, set the ``kinesis.credentials.provider.class`` property to the name of any class that implements the `com.amazon.auth.AWSCredentialsProvider <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html>`__ interface.

.. important::

   If you are using a different credentials provider, do not include the ``aws.acess.key.id`` and ``aws.secret.key.id`` in the connector configuration file. If these parameters are included, they will override the custom credentials provider class.


Complete the following steps to use a different credentials provider:

#. Find or create a Java credentials provider class that implements the `com.amazon.auth.AWSCredentialsProvider <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html>`__ interface.

#. Put the class file in a JAR file.

#. Place the JAR file in the ``share/java/kafka-connect-kinesis`` directory on **all Connect workers**.

#. Restart the Connect workers.

#. Change the kinesis connector property file to use your custom credentials. Add the provider class entry ``kinesis.credentials.provider.class=<className>`` in the kinesis connector properties file. 

   .. important::
      
      You must use the fully qualified class name in the ``<className>`` entry.
      

Additional documentation
------------------------

.. toctree::
   :maxdepth: 1
   
   kinesis_source_connector_config
   changelog

.. _shard: https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#shard
.. _this: #AWS-Credentials
.. _Delete your stream: https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#clean-up
.. _Default AWS Credentials Provider Chain: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
.. _read access: https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-iam.html
.. _here: https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html
.. _AWS account: https://docs.aws.amazon.com/streams/latest/dev/before-you-begin.html#setting-up-sign-up-for-aws
.. _AWS credentials: #AWS-Credentials
.. _Create: https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#create-stream
.. _Insert Records: https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#put-record
.. _Profile Credentials Provider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/profile/ProfileCredentialsProvider.html
.. _AWS Credentials File Format: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-file-format
.. _Environment Variable Credentials Provider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/EnvironmentVariableCredentialsProvider.html
.. _System Properties Credentials Provider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/SystemPropertiesCredentialsProvider.html
.. _SystemPropertiesCredentialsProvider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/SystemPropertiesCredentialsProvider.html
.. _AWSCredentialsProvider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html
.. _Configurable interface: https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
.. _shard: https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#shard
.. _this: #AWS-Credentials
.. _Delete your stream: https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#clean-up
.. _Default AWS Credentials Provider Chain: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
.. _Profile Credentials Provider: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/profile/ProfileCredentialsProvider.html