.. _connect_redshift: Amazon Redshift Sink Connector for |cp| ======================================= .. note:: If you are using |ccloud|, see https://docs.confluent.io/cloud/current/connectors/cc-amazon-redshift-sink.html for the cloud Quick Start. The |kconnect-long| Amazon Redshift Sink connector allows you to export data from |ak-tm| topics to Amazon Redshift. The connector polls data from |ak| and writes this data to an Amazon Redshift database. Polling data is based on subscribed topics. Auto-creation of tables and limited auto-evolution are supported. Prerequisites ------------- The following are required to run the |kconnect-long| Amazon Redshift Sink connector: * |cp| 3.3.0 or above, or |ak| 0.11.0 or above * Java 1.8 * At minimum, ``INSERT`` access privilege is required for this connector. See `Amazon Redshift Grant `__. If ``delete.enabled=true``, ``DELETE`` access privilege is required. Install the Amazon Redshift Connector ------------------------------------- .. include:: ../includes/connector-install-hub.rst :: confluent-hub install confluentinc/kafka-connect-aws-redshift:latest .. include:: ../includes/connector-install-version.rst :: confluent-hub install confluentinc/kafka-connect-aws-redshift:1.0.0-preview If you are running a multi-node |kconnect| cluster, the Redshift connector and JDBC driver JARs must be installed on every |kconnect| worker in the cluster. See below for details. -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `. ----------------------------------- Installing the Redshift JDBC Driver ----------------------------------- The Redshift sink connector uses the `Java Database Connectivity (JDBC) API `__, to connect to Amazon Redshift. In order for this to work, the connector must have a *JDBC Driver* for Redshift. #. Click `Here `_ to download Redshift JDBC Drivers #. Find the latest JDBC 4.0 driver JAR file that comes *with* the |aws| SDK. #. Place this JAR file into the ``share/confluent-hub-components/confluentinc-kafka-connect-aws-redshift/lib`` directory in your |cp| installation on each of the |kconnect| worker nodes. #. Restart all of the |kconnect| worker nodes. .. note:: * Since this connector uses the Redshift JDBC driver for database authentication, you must have the |aws| SDK for Java 1.11.118 or later in your Java class path. If you don't have the |aws| SDK for Java installed, you can use a driver that includes the |aws| SDK. For more information, see `Previous JDBC Driver Versions With the AWS SDK for Java `__. * The ``share/confluent-hub-components/confluentinc-kafka-connect-aws-redshift/lib`` directory mentioned above is for |cp| when this connector is installed through Confluent Hub ("confluent-hub install confluentinc/kafka-connect-aws-redshift:latest"). If you are using a different installation, find the location where the Confluent connector JAR files are located, and place the JDBC driver JAR file into the same directory. * If the JDBC driver is not installed correctly, the Redshift connector will fail on startup. Typically, the system throws the error ``No suitable driver found``. If this happens, install the JDBC driver again by following the instructions. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`aws-redshift-sink-connector-license-config` for license properties and :ref:`redshift-sink-license-topic-configuration` for information about the license topic. .. _connect_redshift-sink: Quick Start ----------- To see the basic functionality of the connector, we'll be copying Avro data from a single topic to a Redshift instance. **Prerequisites:** - :ref:`Confluent Platform ` is installed and services are running by using the |confluent-cli| :ref:`cli` commands. .. note:: This quick start assumes that you are using the :ref:`cli` commands, but :ref:`standalone installations ` are also supported. By default |zk|, |ak-tm|, |sr|, |kconnect-long| REST API, and |kconnect-long| are started with the :litwithvars:`|confluent_start|` command. - |ak| and |sr| are running locally on the default ports. .. _redshift-instructions: ---------------------------------- Create an Amazon Redshift instance ---------------------------------- #. Log into your |aws| Management Console. #. Navigate to `Redshift `__. .. warning:: Your account needs permission to create and administer Redshift instances. If you see **User is not authorized to describe clusters**, then you will need to contact your account administrator to set up your Redshift cluster. #. Navigate to Clusters. #. Click "Quick Launch Cluster". #. Set the "Master User Password". Remember this password for a later step. #. Click "Launch Cluster" to complete the setup. #. Wait for your cluster to be in the "available" state (approximately 5 minutes) .. note:: You will need the information in the Cluster Configuration screen to complete the connector configuration. --------------------------------------- Load the Amazon Redshift Sink Connector --------------------------------------- #. Create a properties file for your Redshift Sink Connector. :: name=redshift-sink confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 connector.class=io.confluent.connect.aws.redshift.RedshiftSinkConnector tasks.max=1 topics=orders aws.redshift.domain=< Required Configuration > aws.redshift.port=< Required Configuration > aws.redshift.database=< Required Configuration > aws.redshift.user=< Required Configuration > aws.redshift.password=< Required Configuration > pk.mode=kafka auto.create=true Fill in the configuration parameters of your cluster as they appear in your `Cluster Details `__. #. Load the ``redshift-sink`` connector: .. include:: ../../includes/confluent-local-consume-limit.rst .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_load| redshift-sink|dash| -d redshift-sink.properties Your output should resemble: :: { "name": "redshift-sink", "config": { "confluent.topic.bootstrap.servers": "localhost:9092", "connector.class": "io.confluent.connect.aws.redshift.RedshiftSinkConnector", "tasks.max": "1", "topics": "orders", "aws.redshift.domain": "cluster-name.cluster-id.region.redshift.amazonaws.com", "aws.redshift.port": "5439", "aws.redshift.database": "dev", "aws.redshift.user": "awsuser", "aws.redshift.password": "your-password", "auto.create": "true", "pk.mode": "kafka", "name": "redshift-sink" }, "tasks": [], "type": "sink" } .. tip:: For non-CLI users, you can load the Redshift Sink connector with the command below. :: /bin/connect-standalone \ /etc/schema-registry/connect-avro-standalone.properties \ redshift-sink.properties ------------------------- Produce a Record in Kafka ------------------------- #. Produce a record into the ``orders`` topic. :: ./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}' The console producer waits for input. #. Copy and paste the following record into the terminal and press **Enter**: :: {"id": 999, "product": "foo", "quantity": 100, "price": 50} #. Open the `Query Editor `_ and execute the following query :: SELECT * from orders; Features -------- ------------ Data mapping ------------ The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with |sr|, or the JSON converter with schemas enabled. |ak| record keys, if present, can be primitive types or a |kconnect| struct, and the record value must be a |kconnect| struct. Fields being selected from |kconnect| structs must be of primitive types. If the data in the topic is not of a compatible format, implementing a custom ``Converter`` or using :ref:`Single Message Transforms (SMTs) ` may be necessary. ------------ Key handling ------------ The default is for primary keys to not be extracted with ``pk.mode`` set to `none`, which is not suitable for advanced usage such as upsert semantics and when the connector is responsible for auto-creating the destination table. There are different modes that enable to use fields from the |ak| record key, the |ak| record value, or the |ak| coordinates for the record. Refer to :ref:`primary key configuration options ` for further detail. ----------- Delete mode ----------- The connector can delete rows in a database table when it consumes a *tombstone* record, which is a Kafka record that has a non-null key and a null value. This behavior is disabled by default, meaning that any tombstone records will result in a failure of the connector, making it easy to upgrade the JDBC connector and keep prior behavior. Deletes can be enabled with ``delete.enabled=true``, but only when the ``pk.mode`` is set to ``record_key``. This is because deleting a row from the table requires the primary key be used as criteria. Enabling delete mode does not affect the ``insert.mode``. .. The following is currently unsupported and commented out. ----------------- Idempotent writes ----------------- The default ``insert.mode`` is ``insert``. If it is configured as ``upsert``, the connector will use upsert semantics rather than plain ``INSERT`` statements. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, which provides idempotence. If there are failures, the |ak| offset used for recovery may not be up-to-date with what was committed as of the time of the failure, which can lead to re-processing during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed. Aside from failure recovery, the source topic may also naturally contain multiple records over time with the same primary key, making upserts desirable. Redshift uses the following PostgreSQL-specific DML for upserts: ``INSERT .. ON CONFLICT .. DO UPDATE SET ..`` -------------------------------- Auto-creation and Auto-evolution -------------------------------- .. tip:: Make sure the JDBC user has the appropriate permissions for DDL. If ``auto.create`` is enabled, the connector can `CREATE` the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. Primary keys are specified based on the key :ref:`configuration settings `. If ``auto.evolve`` is enabled, the connector can perform limited auto-evolution by issuing `ALTER` on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Addition of primary key constraints is also not attempted. For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema, and default values are also specified based on the default value of the corresponding field if applicable. We use the following mapping from |kconnect| schema types to database types: +-------------+------------------+ | Schema Type | Redshift | +=============+==================+ | INT8 | SMALLINT | +-------------+------------------+ | INT16 | SMALLINT | +-------------+------------------+ | INT32 | INT | +-------------+------------------+ | INT64 | BIGINT | +-------------+------------------+ | FLOAT32 | REAL | +-------------+------------------+ | FLOAT64 | DOUBLE PRECISION | +-------------+------------------+ | BOOLEAN | BOOLEAN | +-------------+------------------+ | STRING | TEXT | +-------------+------------------+ | 'Decimal' | DECIMAL | +-------------+------------------+ | 'Date' | DATE | +-------------+------------------+ | 'Time' | TIME | +-------------+------------------+ | 'Timestamp' | TIMESTAMP | +-------------+------------------+ | BYTES | *Not Supported* | +-------------+------------------+ | 'Struct' | *Not Supported* | +-------------+------------------+ | 'Map' | *Not Supported* | +-------------+------------------+ | 'Array' | *Not Supported* | +-------------+------------------+ .. important:: For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable. Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 sink_config_options changelog