Amazon Kinesis Source Connector for Confluent Cloud

Note

If you are installing the connector locally for Confluent Platform, see Amazon Kinesis Source Connector for Confluent Platform.

The Kafka Connect Amazon Kinesis Source connector for Confluent Cloud is used to pull data from Amazon Kinesis and persist the data to an Apache Kafka® topic.

Features

Important

Confluent Cloud Enterprise customers must have a Confluent Cloud annual commitment to use this connector. Contact your Confluent Account Executive to learn more and to update your subscription, if necessary.

The Amazon Kinesis Source connector provides the following features:

  • Fetches records from all shards in one Kinesis stream.
  • Select configuration properties:
    • Offset position:
      • AT_TIMESTAMP
      • LATEST
      • TRIM_HORIZON
      • kinesis.shard.timestamp.ms
    • Other properties:
      • kinesis.region
      • kinesis.record.limit
      • kinesis.throughput.exceeded.backoff.ms

Configuration properties that are not shown in the Confluent Cloud UI use the default values. For default values and property definitions, see Kinesis Source Connector Configuration Properties.

For more information, see the Confluent Cloud connector limitations.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Kinesis source connector. The quick start shows how to select the connector and configure it to pull data from Amazon Kinesis and persist the data to an Apache Kafka® topic. It then monitors and records all subsequent row-level changes.

Prerequisites
  • Kafka cluster credentials. You can use one of the following ways to get credentials:
    • Create a Confluent Cloud API key and secret. To create a key and secret, go to Kafka API keys in your cluster or you can autogenerate the API key and secret directly in the UI when setting up the connector.
    • Create a Confluent Cloud service account for the connector.

Using the Confluent Cloud GUI

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

Click Connectors. If you already have connectors in your cluster, click Add connector.

Step 3: Select your connector.

Click the Amazon Kinesis Source connector icon.

Amazon Kinesis Source Connector Icon

Step 4: Set up the connection.

Complete the following and click Continue.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Enter a connector name.
  2. Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.
  3. Enter the topic name or topic names where you want to send data.
  4. Enter your AWS credentials.
  5. Add the Kinesis details.
    • (Optional) The stream offset position is where messages start being consumed from the Kinesis stream. The default if not selected is TRIM_HORIZON. Available offset positions are:
      • AT_TIMESTAMP: Get records starting at a point in time. Used with the timestamp format below.
      • LATEST: Start with the most recent record.
      • TRIM_HORIZON (default): Start with the last untrimmed record (the oldest record).
    • (Optional) The timestamp format to use when when AT_TIMESTAMP is selected. Formats allowed are the simple date-time format yyyy-MM-dd’T’HH:mm:ss.SSSXXX or epoch time in milliseconds.
  6. Add the Connection details for your connection to the stream.
  7. Enter the maximum number of connector tasks.

Configuration properties that are not shown in the Confluent Cloud UI use the default values. For default values and property definitions, see Kinesis Source Connector Configuration Properties.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Launch the connector

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running. It may take a few minutes.

Check the connector status

Step 7: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Confluent Cloud Dead Letter Queue for details.

For additional information about this connector, see Amazon Kinesis Source Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL example. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../../_images/topology.png

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Important

You must create topic names before before creating and launching this connector. For this Quick Start example, the database table being sourced is named kinesis-testing. Before starting these steps, make sure you create a Kafka topic named kinesis-testing using the command below:

ccloud kafka topic create kinesis-testing

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe KinesisSource

Example output:

Following are the required configs:
connector.class
name
kafka.api.key
kafka.api.secret
kafka.topic
aws.access.key.id
kinesis.region
aws.secret.key.id
kinesis.stream
kinesis.position
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
    "name" : "confluent-kinesis-source",
    "connector.class": "KinesisSource",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret" : "<my-kafka-api-secret>",
    "kafka.topic" : "kinesis-testing",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.key.id": "<my-aws-access-key-secret>",
    "kinesis.stream": "my-kinesis-stream",
    "kinesis.region" : "us-west-2",
    "kinesis.position": "AT_TIMESTAMP",
    "kinesis.shard.timestamp.ms": "1590692978237"
    "tasks.max" : "1"
}

Note the following property definitions:

  • "name": Sets a name for your new connector.

  • "connector.class": Identifies the connector plugin name.

  • "kinesis.region": Identifies the AWS region where the Kinesis data stream is located. Examples are us-west-2, us-east-2, ap-northeast-1, eu-central-1, and so on.

  • (Optional) "kinesis.position": Identifies the stream offset position. This is where messages start being consumed from the Kinesis stream. Available offset positions are:

    • AT_TIMESTAMP: Get records starting at a point in time. Used with the timestamp format below.
    • LATEST: Start with the most recent record.
    • TRIM_HORIZON (default): Start with the last untrimmed record (the oldest record).

  • (Optional) "kinesis.shard.timestamp.ms": The timestamp format to use when when AT_TIMESTAMP is selected. Allowed formats are the simple date-time format yyyy-MM-dd’T’HH:mm:ss.SSSXXX or epoch time in milliseconds.

  • "tasks.max": The maximum number of connector tasks.

    Note

    Configuration properties that are not listed use the default values. For default values and property definitions, see Kinesis Source Connector Configuration Properties.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config kinesis-source.json

Example output:

Created connector confluent-kinesis-source lcc-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID          |           Name           | Status  |  Type
+-----------+--------------------------+---------+--------+
lcc-ix4dl   | confluent-kinesis-source | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Confluent Cloud Dead Letter Queue for details.

For additional information about this connector, see Amazon Kinesis Source Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

Suggested Reading

The following blog post includes steps to set up an example pipeline to get a mock payments stream from Amazon Kinesis into Confluent Cloud using the Confluent Cloud Amazon Kinesis Source connector.

Blog post: How Merging Companies Will Give Rise to Unified Data Streams

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL example. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../../_images/topology.png