.. _netezza_sink: .. |nzjdbc| replace:: nzjdbc-1.0.jar Netezza Sink Connector for |cp| =============================== The |kconnect-long| Netezza sink connector allows you to export data from |ak-tm| topics to Netezza. The connector polls data from Kafka to write to Netezza based on the topics subscription. Prerequisites ------------- The following are required to run the |kconnect-long| Netezza Sink Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 4.0.0 or above, or |ak| 1.0.0 or above * Netezza 7.2.1 or above (tested with Netezza emulator version 7.2.1 running on VMware-Player-15). * Manually installed Netezza JDBC driver 1.0 or above (see :ref:`install_netezza_jdbc_driver`) * Topics containing records with schema information (see :ref:`data_mapping`) Limitations ----------- * Inserts in Netezza happen at a very slow rate. Confluent recommends using a batch size of 10,000. * This connector can only insert data into Netezza. Updates are not supported. * If ``auto.create`` is enabled, the default values for fields are ignored. This is because Netezza does not allow default values for columns. * If ``auto.evolve`` is enabled, the connector can only add new columns for fields that are marked optional. Mandatory fields are not supported, even if they have default values. * Deleting fields is not supported; this includes fields that were previously optional. If you need to delete fields, you must manually delete the column from the corresponding Netezza table. Installing the Netezza connector -------------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-netezza:1.0.0 .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-netezza:1.0.0 ------------------------------ Install the connector manually ------------------------------ `Download and extract the ZIP file `_ for your connector and then follow the :ref:`manual connector installation instructions `. .. _install_netezza_jdbc_driver: ------------------------------- Install the Netezza JDBC driver ------------------------------- The |kconnect-long| Netezza connector does not come with the Netezza JDBC driver. If you are running a multi-node |kconnect| cluster, the Netezza connector and Netezza JDBC driver JAR (distributed by IBM) must be installed on **every** |kconnect| worker in the cluster. Install the |nzjdbc| file on **every** |kconnect| worker in the cluster where the connector is installed: #. Download the Netezza JDBC driver from the `Maven artifact repository `__. #. After downloading the driver, copy the ``nzjdbc-1.0.jar`` file into the ``share/java/kafka-connect-netezza`` directory of your |cp| installation **on every worker node**. #. Restart all of the |kconnect| worker nodes. .. note:: The ``share/java/kafka-connect-netezza`` directory mentioned above is for |cp|. If you are using a different installation, find the location of the Confluent Netezza connector JAR files and place the |nzjdbc| file into the same directory. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`netezza-sink-connector-license-config` for license properties and :ref:`netezza_license-topic-configuration` for information about the license topic. Setting Up Netezza ------------------ The Netezza JDBC sink connector attempts to :ref:`create or alter the database schema ` if the tables do not exist or do not have a schema that reflects the structure of the records. For the reason, you should set up a Netezza database user with permissions to execute DDL. Use this Netezza database user in the connector. Features -------- The Netezza connector offers the following features: - :ref:`data_mapping` - :ref:`auto_creation_and_evolution` .. _data_mapping: ------------ Data mapping ------------ The Netezza sink connector requires knowledge of schemas, so you should use a suitable converter (for example: the Avro converter that comes with |sr| or the JSON converter with schemas enabled). |ak| record keys (if present) can be primitive types or a |kconnect| struct. The record value must be a |kconnect| struct. Fields being selected from |kconnect| structs must be primitive types. If the data in the topic is not in a compatible format implementing a custom converter may be necessary. .. _auto_creation_and_evolution: -------------------------------- Auto-creation and Auto-evolution -------------------------------- If ``auto.create`` is enabled, the connector can *CREATE* the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. .. note:: Netezza does not support default values for columns. If your schema has fields with default values, they will be added, but the default value will be ignored. If ``auto.evolve`` is enabled, the connector can perform limited auto-evolution by issuing *ALTER* on the destination table when it encounters a record for with a missing column. Since changes to data-types and removing columns can be dangerous, the connector does not attempt to perform these evolutions on the table. The connector also does not attempt to add primary key constraints. .. important:: * For backwards-compatible table schema evolution, new fields in record schemas must be optional. Mandatory fields with a default value are **not supported**. * If you need to delete a field, the table schema should be manually altered to drop the corresponding column. Marking the column nullable does not work. You **must** drop the corresponding column. Quick Start ----------- This quick start uses the Netezza JDBC sink connector to copy Avro data from a single |ak| topic, in a locally running broker, to a Netezza database in an emulator (also running locally). You first install the Netezza emulator version 7.2.1 (or later) running on VMware-Player-15 (or later), then start |cp| locally, and then run the Netezza sink connector. ------------------------------------------------- Install VMware-Player-15 and the Netezza emulator ------------------------------------------------- Use the provided links to download `VMware-Player-15 `_ and the `Netezza Emulator `_. Complete the following steps install VMware-Player-15: #. Change the execution permission on the downloaded VMware-Player file. .. codewithvars:: bash chmod +x ~/Downloads/VMware-Player #. Start the VMware-Player installation. .. codewithvars:: bash sudo sh ~/Downloads/VMware-Player* #. Proceed with the VMware-Player installation steps by clicking on **next**. Complete the following steps to install and run the Netezza emulator: #. Open VMware player and click on **Open a Virtual machine**. A pop-up message directs you to select the ``.ova`` file you downloaded. #. Select the file and click on the import button. VMware imports and creates a new virtual machine from emulator file. #. Change the virtual machine memory settings and set it to minimum 2GB. #. Start the virtual machine and log in to Netezza appliances with default credentials. This launches the CPU-based community version of Netezza and maps it to port 5480 on your localhost. By default, the user name is ``admin`` and the password is ``password``. The default database is ``SYSTEM``. --------------- Start Confluent --------------- Start the Confluent services using the following :ref:`cli` command. .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_start| .. important:: Don not use the :ref:`cli` in production environments. --------------- Produce records --------------- Create a record in the ``orders`` topic. .. codewithvars:: bash $ bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}' The console producer will wait for input. Copy and paste the following record into the terminal: .. codewithvars:: bash {"id": 999, "product": "foo", "quantity": 100, "price": 50} ---------------------- Property-based example ---------------------- Create a configuration file for the connector. This file is included with the connector in ``./etc/kafka-connect-netezza/NetezzaSinkConnector.properties`` and contains the following settings: .. codewithvars:: bash name=NetezzaSinkConnector connector.class=io.confluent.connect.netezza.NetezzaSinkConnector tasks.max=1 topics=orders connection.host=192.168.24.74 connection.port=5480 connection.database=SYSTEM connection.user=admin connection.password=password batch.size=10000 auto.create=true The first few parameters are common settings that you specify for all connectors, with the exception of ``topics`` which is specific to sink connectors like this one. The ``connection.host``, ``connection.port``, ``connection.database``, ``connection.user`` and ``connection.password`` specifies the connection URL, username, and password of the local Netezza database. Since ``auto.create`` is enabled, the connector creates the table if it is not present. Batch size is set to ``batch.size=10000`` (the default value). Even though ``batch.size`` is set to the default value, it is included in the configuration example for clarity. Run the connector with this configuration. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| NetezzaSinkConnector|dash| -d NetezzaSinkConnector.properties Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| NetezzaSinkConnector ------------------ REST-based example ------------------ This configuration is typically used for :ref:`distributed workers `. See the |kconnect-long| :ref:`REST API ` for details REST API information. #. Write the following JSON sample code to ``connector.json`` and set all of the required parameters. .. code-block:: json { "name": "NetezzaSinkConnector", "config":{ "connector.class": "io.confluent.connect.netezza.NetezzaSinkConnector", "tasks.max": "1", "topics": "orders", "connection.host": "192.168.24.74", "connection.port": "5480", "connection.database": "SYSTEM", "connection.user": "admin", "connection.password": "password", "batch.size": "10000", "auto.create": "true" } } #. Use curl to post the configuration to one of the |kconnect-long| workers. Change ``http://localhost:8083/`` the endpoint of one of your |kconnect-long| worker(s). .. code-block:: bash curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors .. code-block:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/NetezzaSinkConnector/config #. To verify the data in Netezza, log in to Netezza and connect to the Netezza database with the following command: .. codewithvars:: bash [nz@netezza ~]$nzsql .. tip:: The following are several important Netezza commands: ``\l`` displays all databases present with associated users. ``\dt`` displays all tables in the present database. ``\dt`` and ``\dv`` lists tables or views. ``\d`` describes a table or view. #. Run the following SQL query to verify the records: .. codewithvars:: bash SYSTEM.ADMIN(ADMIN)=> select * from orders; foo|50.0|100|999 Troubleshooting --------------- ------------------------------------------ SPU virtual machine and NPS will not start ------------------------------------------ The following error message may appear when you run a Netezza emulator with VMware Fusion: :: You need to enable 'Virtualize Intel VT-x/EPT or AMD-V/RVI' option in virtual machine processor settings in VMware. Otherwise, SPU virtual machine and NPS will not start Complete the steps below to resolve this issue: #. Go to **VMware Fusion** > **Virtual Machine** > **Settings** > **Processors & Memory** > **Advanced Options**. #. Turn on ``Enable hypervisor applications in this virtual machine.`` Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options changelog