Kafka Connect Kinesis Source Connector

The Kinesis Source Connector is used to pull data from Amazon Kinesis and persist the data to an Apache Kafka® topic.

Install the Kinesis Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install Connector Using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Platform commercial features.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-kinesis:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-kinesis:1.1.4

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

This connector is available under the Confluent Software Evaluation License. You can use this connector for a 30-day trial period. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

For more information, see License Topic Configuration.

Usage Notes

The default credentials provider is DefaultAWSCredentialsProviderChain. For more information, see the AWS documentation.

Examples

Streaming ETL Demo

To evaluate the Kafka Connect Kinesis source connector, S3 sink connector, and GCS sink connector in an end-to-end streaming deployment, refer to the AWS Kinesis > Confluent Cloud > Cloud Storage pipeline demo on GitHub. This demo also allows you to evaluate the real-time data processing capabilities of Confluent KSQL.

Property based example

This configuration is used typically along with standalone workers.

name=KinesisSourceConnector1
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=1
aws.access.key.id=< Optional Configuration >
aws.secret.key.id=< Optional Configuration >
kafka.topic=< Required Configuration >
kinesis.stream=< Required Configuration >
kinesis.region=< Optional Configuration - defaults to US_EAST_1 >
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

REST based example

This configuration is used typically along with distributed workers. Write the following JSON to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect REST API

Connect Distributed REST example
{
  "config" : {
    "name" : "KinesisSourceConnector1",
    "connector.class" : "io.confluent.connect.kinesis.KinesisSourceConnector",
    "tasks.max" : "1",
    "aws.access.key.id" : "< Optional Configuration >",
    "aws.secret.key.id" : "< Optional Configuration >",
    "kafka.topic" : "< Required Configuration >",
    "kinesis.stream" : "< Required Configuration >"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

Create a new connector
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/KinesisSourceConnector1/config