Amazon Kinesis Source Connector for Confluent Cloud

Note

If you are installing the connector locally for Confluent Platform, see Amazon Kinesis Source Connector for Confluent Platform.

The Kafka Connect Amazon Kinesis Source connector is used to pull data from Amazon Kinesis and persist the data to an Apache Kafka® topic.

Important

Once this connector moves from Preview to Generally Availability (GA), it will require a subscription for Confluent Cloud commitment for Confluent Cloud Enterprise customers. Without a Confluent Cloud commitment, Confluent Cloud Enterprise customers will not have access to these connectors in GA. Contact your Confluent Account Executive to learn more and to update your subscription, if necessary.

Features

The Amazon Kinesis Source connector for Confluent Cloud provides the following features:

  • Fetches records from all shards in one Kinesis stream.
  • Select configuration properties:
    • Offset position:
      • AT_SEQUENCE_NUMBER
      • AFTER_SEQUENCE_NUMBER
      • TRIM_HORIZON
      • LATEST
      • AT_TIMESTAMP
    • Other properties:
      • kinesis.region
      • kinesis.record.limit
      • kinesis.throughput.exceeded.backoff.ms

Configuration properties that are not shown in the Confluent Cloud UI use the default values. For default values and property definitions, see Kinesis Source Connector Configuration Properties.

For more information, see the Confluent Cloud connector limitations.

Caution

Preview connectors are not currently supported and are not recommended for production use. For specific connector limitations, see Cloud connector limitations.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Kinesis source connector. The quick start shows how to select the connector and configure it to pull data from Amazon Kinesis and persist the data to an Apache Kafka® topic. It then monitors and records all subsequent row-level changes.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS).
  • At least one topic must exist before creating the connector.
  • The Confluent Cloud CLI installed and configured for the cluster. See Install and Configure the Confluent Cloud CLI.
  • An AWS account configured with Access Keys. You use these access keys when setting up the connector.
  • An available Amazon Kinesis Data Stream.
  • Use one of the following for the Kafka cluster credentials fields:
    • A Confluent Cloud API key and secret. After you have created your cluster, go to Cluster settings > API access > Create Key.
    • A Confluent Cloud service account.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud KSQL, see the Cloud ETL demo. The demo also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

Using the Confluent Cloud GUI

Step 1: Launch your Confluent Cloud cluster.

See the Confluent Cloud Quick Start for installation instructions.

Step 2: Add a connector.

Click Connectors > Add connector.

Add a connector

Step 3: Select your connector.

Click the Kinesis Source connector icon.

Step 4: Set up the connection.

Complete the following and click Continue.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk (*) designates a required entry.
  1. Enter a connector name.

  2. Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.

  3. Enter the topic name or topic names where you want to send data.

  4. Enter your AWS credentials.

  5. Add the Kinesis details. The stream offset position is where messages start being consumed from the Kinesis stream. Available offset positions are:

    • AT_SEQUENCE_NUMBER
    • AFTER_SEQUENCE_NUMBER
    • TRIM_HORIZON
    • LATEST
    • AT_TIMESTAMP

    To learn more about how these positions are defined, see GetShardIterator .

  6. Add the Connection details for your connection to the stream.

  7. Enter the number of tasks in use by the connector. Refer to Confluent Cloud connector limitations for additional information.

Configuration properties that are not shown in the Confluent Cloud UI use the default values. For default values and property definitions, see Kinesis Source Connector Configuration Properties.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Launch the connector

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running. It may take a few minutes.

Check the connector status

Step 7: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For additional information about this connector, see Amazon Kinesis Source Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Important

You must create topic names before before creating and launching this connector. For this Quick Start example, the database table being sourced is named kinesis-testing. Before starting these steps, make sure you create a Kafka topic named kinesis-testing using the command below:

ccloud kafka topic create kinesis-testing

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe KinesisSource

Example output:

Following are the required configs:
connector.class
name
kafka.api.key
kafka.api.secret
kafka.topic
aws.access.key.id
kinesis.region
aws.secret.key.id
kinesis.stream
kinesis.position
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
    "name" : "confluent-kinesis-source",
    "connector.class": "KinesisSource",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret" : "<my-kafka-api-secret>",
    "kafka.topic" : "kinesis-testing",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.key.id": "<my-aws-access-key-secret>",
    "kinesis.stream": "my-kinesis-stream",
    "kinesis.region" : "us-west-2",
    "kinesis.position": "TRIM_HORIZON",
    "tasks.max" : "1"
}

Note the following property definitions:

  • "name": Sets a name for your new connector.

  • "connector.class": Identifies the connector plugin name.

  • "kinesis.region": Identifies the AWS region where the Kinesis data stream is located. Examples are us-west-2, us-east-2, ap-northeast-1, eu-central-1, and so on.

  • "kinesis.position": Identifies the stream offset position. This is where messages start being consumed from the Kinesis stream. Available offset positions are:

    • AT_SEQUENCE_NUMBER
    • AFTER_SEQUENCE_NUMBER
    • TRIM_HORIZON
    • LATEST
    • AT_TIMESTAMP

    To learn more about how these positions are defined, see GetShardIterator.

    Note

    Configuration properties that are not listed use the default values. For default values and property definitions, see Kinesis Source Connector Configuration Properties.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config kinesis-source.json

Example output:

Created connector confluent-kinesis-source jtt-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID          |           Name           | Status  |  Type
+-----------+--------------------------+---------+--------+
jtt-ix4dl   | confluent-kinesis-source | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For additional information about this connector, see Amazon Kinesis Source Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

Next Steps

Try out a Confluent Cloud demo.