Amazon Redshift Sink Connector for Confluent Platform¶
The Kafka Connect Amazon Redshift Sink connector allows you to export data from Apache Kafka® topics to Amazon Redshift. The connector polls data from Kafka and writes this data to an Amazon Redshift database. Polling data is based on subscribed topics. Auto-creation of tables and limited auto-evolution are supported.
Features¶
The Amazon Redshift Sink connector for Confluent Platform includes the following features:
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Amazon Redshift Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable
converter (for example, the Avro converter that comes with Schema Registry, or the JSON
converter with schemas enabled. Kafka record keys, if present, can be primitive
types or a Connect struct, and the record value must be a Connect struct.
Fields being selected from Connect structs must be of primitive types. If the
data in the topic is not of a compatible format, implementing a custom
Converter
or using Single Message Transforms
(SMTs) may be necessary.
Key handling¶
The default is for primary keys to not be extracted with pk.mode
set to
none
, which is not suitable for advanced usage such as upsert semantics and
when the connector is responsible for auto-creating the destination table. There
are different modes that enable to use fields from the Kafka record key, the Kafka
record value, or the Kafka coordinates for the record.
Refer to primary key configuration options for more details.
Delete mode¶
The connector can delete rows in a database table when it consumes a tombstone record, which is a Kafka record that has a non-null key and null value. This behavior is disabled by default, meaning that any tombstone records will result in a failure of the connector, making it easy to upgrade the JDBC connector and keep prior behavior.
You can enable deletes by setting delete.enabled=true
when pk.mode
is
set to record_key
. This is because deleting a row from the table requires
the primary key be used as criteria.
Enabling delete mode does not affect the insert.mode
.
Auto-creation and auto-evolution¶
Tip
Make sure the JDBC user has the appropriate permissions for DDL.
If auto.create
is enabled, the connector can create the destination table
if it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition. Primary keys are specified based on the key
configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER on the destination table when it encounters a record for
which a column is found to be missing. Since data-type changes and removal of
columns can be dangerous, the connector does not attempt to perform such
evolutions on the table. Addition of primary key constraints is also not
attempted.
For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema, and default values are also specified based on the default value of the corresponding field if applicable. You can use the following mapping from Connect schema types to database types:
Schema Type | Redshift |
---|---|
INT8 | SMALLINT |
INT16 | SMALLINT |
INT32 | INT |
INT64 | BIGINT |
FLOAT32 | REAL |
FLOAT64 | DOUBLE PRECISION |
BOOLEAN | BOOLEAN |
‘Decimal’ | DECIMAL |
‘Date’ | DATE |
‘Time’ | TIME |
‘Timestamp’ | TIMESTAMP |
BYTES | Not supported |
‘Struct’ | Not supported |
‘Map’ | Not supported |
‘Array’ | Not supported |
Important
For backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.
Limitations¶
The Amazon Redshift Sink connector does not support Avro schemas that contain
decimal
logical types. For a better understanding of numeric data types, see this
blog post: Bytes, Decimals, Numerics and oh my.
Install the Amazon Redshift connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
Java 1.8
At minimum,
INSERT
access privilege is required for this connector. See Amazon Redshift Grant. Ifdelete.enabled=true
,DELETE
access privilege is required.The connector configuration requires a Redshift user (and password) with Redshift database privileges. For example:
CREATE DATABASE <DB_NAME>; CREATE USER <DB_USER> PASSWORD '<DB_PASSWORD>'; GRANT USAGE ON SCHEMA public TO <DB_USER>; GRANT CREATE ON SCHEMA public TO <DB_USER>; GRANT SELECT ON ALL TABLES IN SCHEMA public TO <DB_USER>; GRANT ALL ON SCHEMA public TO <DB_USER>; GRANT CREATE ON DATABASE <DB_NAME> TO <DB_USER>;
For more information, see the Redshift docs.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-aws-redshift:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-aws-redshift:1.2.2
If you are running a multi-node Connect cluster, the Redshift connector and JDBC driver JARs must be installed on every Connect worker in the cluster.
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license and for information about the license topic, see License topic configuration.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Amazon Redshift Sink Connector for Confluent Platform.
Quick start¶
To see the basic functionality of the connector, this quick start demonstrates how to copy Avro data from a single topic to a Redshift instance.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Prerequisites¶
- Confluent Platform is installed and services are running by using the Confluent CLI Confluent CLI commands.
- Kafka and Schema Registry are running locally on the default ports.
Note that this quick start assumes that you are using the
Confluent CLI commands; however,
standalone installations are
also supported. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and
Kafka Connect are started with the confluent local services start
command.
Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent
recommends KRaft mode for new deployments.
Create an Amazon Redshift instance¶
- Log into your AWS Management Console.
- Navigate to Redshift. Note that your account needs permission to create and administer Redshift instances. If you see User <you> is not authorized to describe clusters, then you must contact your account administrator to set up your Redshift cluster.
- Navigate to Clusters.
- Click Quick Launch Cluster.
- Set the Master User Password. You must remember this password for a later step.
- Click Launch Cluster to complete the setup.
- Wait for your cluster to be in the “available” state (this takes approximately 5 minutes). Note that you need the information in the Cluster Configuration screen to complete the connector configuration.
Load the Amazon Redshift Sink connector¶
Create a properties file for your Redshift Sink connector.
name=redshift-sink confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 connector.class=io.confluent.connect.aws.redshift.RedshiftSinkConnector tasks.max=1 topics=orders aws.redshift.domain=< Required Configuration > aws.redshift.port=< Required Configuration > aws.redshift.database=< Required Configuration > aws.redshift.user=< Required Configuration > aws.redshift.password=< Required Configuration > pk.mode=kafka auto.create=true
Fill in the configuration parameters of your cluster as they appear in your Cluster Details.
Load the
redshift-sink
connector:Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load redshift-sink --config redshift-sink.properties
Your output should resemble the following:
{ "name": "redshift-sink", "config": { "confluent.topic.bootstrap.servers": "localhost:9092", "connector.class": "io.confluent.connect.aws.redshift.RedshiftSinkConnector", "tasks.max": "1", "topics": "orders", "aws.redshift.domain": "cluster-name.cluster-id.region.redshift.amazonaws.com", "aws.redshift.port": "5439", "aws.redshift.database": "dev", "aws.redshift.user": "awsuser", "aws.redshift.password": "your-password", "auto.create": "true", "pk.mode": "kafka", "name": "redshift-sink" }, "tasks": [], "type": "sink" }
Note that non-CLI users can load the Redshift Sink connector by using the following command:
${CONFLUENT_HOME}/bin/connect-standalone \ ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-standalone.properties \ redshift-sink.properties
Produce a record in Kafka¶
Produce a record into the
orders
topic../bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}'
The console producer waits for input.
Copy and paste the following record into the terminal and press Enter:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Open the Query Editor and run the following query:
SELECT * from orders;