Netezza Sink Connector for Confluent Platform

The Kafka Connect Netezza Sink connector allows you to export data from Apache Kafka® topics to Netezza. The connector polls data from Kafka to write to Netezza based on the topics subscription.

Features

The Netezza Sink connector includes the following features:

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Data mapping

The Netezza sink connector requires knowledge of schemas, so you should use a suitable converter (for example: the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled). Kafka record keys (if present) can be primitive types or a Connect struct. The record value must be a Connect struct. Fields being selected from Connect structs must be primitive types. If the data in the topic is not in a compatible format, implementing a custom converter may be necessary.

Auto-creation and auto-evolution

If auto.create is enabled, the connector can create the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.

Note

Netezza does not support default values for columns. If your schema has fields with default values, they will be added, but the default value will be ignored.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for with a missing column.

Because changes to data types and removing columns can be dangerous, the connector does not attempt to perform these evolutions on the table. The connector also does not attempt to add primary key constraints.

Important

  • For backward-compatible table schema evolution, new fields in record schemas must be optional. Mandatory fields with a default value are not supported.
  • If you need to delete a field, the table schema should be manually altered to drop the corresponding column. Marking the column nullable does not work. You must drop the corresponding column.

Limitations

  • Inserts in Netezza happen at a very slow rate. Confluent recommends using a batch size of 10,000.
  • This connector can only insert data into Netezza. Updates are not supported.
  • If auto.create is enabled, the default values for fields are ignored. This is because Netezza does not allow default values for columns.
  • If auto.evolve is enabled, the connector can only add new columns for fields that are marked optional. Mandatory fields are not supported, even if they have default values.
  • Deleting fields is not supported; this includes fields that were previously optional. If you need to delete fields, you must manually delete the column from the corresponding Netezza table.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see:ref:netezza-sink-connector-license-config. For information about the license topic, see License topic configuration.

Configuration Properties

For a complete list of configuration properties for this connector, see Netezza Sink Connector Configuration Properties.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Netezza Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.

  • Netezza 7.2.1 or later (tested with Netezza emulator version 7.2.1 running on VMware-Player-15).

  • Manually installed Netezza JDBC driver 1.0 or later. For more details, see Install the Netezza JDBC driver).

  • Topics containing records with schema information. For more details, see Data mapping).

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-netezza:1.0.0
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-netezza:1.0.1
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Install the Netezza JDBC driver

The Kafka Connect Netezza connector does not come with the Netezza JDBC driver. If you are running a multi-node Connect cluster, the Netezza connector and Netezza JDBC driver JAR (distributed by IBM) must be installed on every Connect worker in the cluster.

Install the nzjdbc-1.0.jar file on every Connect worker in the cluster where the connector is installed:

  1. Download the Netezza JDBC driver from the Maven artifact repository.

  2. After downloading the driver, copy the nzjdbc-1.0.jar file into the share/java/kafka-connect-netezza directory of your Confluent Platform installation on every worker node.

  3. Restart all of the Connect worker nodes.

    Note

    The share/java/kafka-connect-netezza directory mentioned above is for Confluent Platform. If you are using a different installation, find the location of the Confluent Netezza connector JAR files and place the nzjdbc-1.0.jar file into the same directory.

Setting Up Netezza

The Netezza JDBC sink connector attempts to create or alter the database schema if the tables do not exist or do not have a schema that reflects the structure of the records. For the reason, you should set up a Netezza database user with permissions to execute DDL. Use this Netezza database user in the connector.

Quick Start

This quick start uses the Netezza JDBC sink connector to copy Avro data from a single Kafka topic, in a locally running broker, to a Netezza database in an emulator (also running locally).

You first install the Netezza emulator version 7.2.1 (or later) running on VMware-Player-15 (or later), then start Confluent Platform locally, and then run the Netezza sink connector.

Install VMware-Player-15 and the Netezza emulator

Use the provided links to download VMware-Player-15 and the Netezza Emulator.

Complete the following steps to install VMware-Player-15:

  1. Change the execution permission on the downloaded VMware-Player file.

    chmod +x ~/Downloads/VMware-Player
    
  2. Start the VMware-Player installation.

    sudo sh ~/Downloads/VMware-Player*
    
  3. Proceed with the VMware-Player installation steps by clicking on next.

  4. Complete the following steps to install and run the Netezza emulator:

    1. Open VMware player and click on Open a Virtual machine. A pop-up message directs you to select the .ova file you downloaded.
    2. Select the file and click on the import button. VMware imports and creates a new virtual machine from emulator file.
    3. Change the virtual machine memory settings and set it to minimum 2GB.
    4. Start the virtual machine and log in to Netezza appliances with default credentials. This launches the CPU-based community version of Netezza and maps it to port 5480 on your localhost. By default, the user name is admin and the password is password. The default database is SYSTEM.

Start Confluent

Start the Confluent services using the following Confluent CLI command.

confluent local services start

Important

Do not use the Confluent CLI in production environments.

Produce records

Create a record in the orders topic.

bin/kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
 "type": "float"}]}'

The console producer will wait for input. Copy and paste the following record into the terminal:

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

Property-based example

Create a configuration file for the connector. This file is included with the connector in ./etc/kafka-connect-netezza/NetezzaSinkConnector.properties and contains the following settings:

name=NetezzaSinkConnector
connector.class=io.confluent.connect.netezza.NetezzaSinkConnector
tasks.max=1
topics=orders
connection.host=192.168.24.74
connection.port=5480
connection.database=SYSTEM
connection.user=admin
connection.password=password
batch.size=10000
auto.create=true

The first few parameters are common settings that you specify for all connectors, with the exception of topics which is specific to sink connectors like this one.

The connection.host, connection.port, connection.database, connection.user and connection.password specifies the connection URL, username, and password of the local Netezza database. Since auto.create is enabled, the connector creates the table if it is not present. Batch size is set to batch.size=10000 (the default value). Even though batch.size is set to the default value, it is included in the configuration example for clarity.

Run the connector with this configuration.

Caution

You must include a double dash (--) between the topic name and your flag. For more information, see this post.

confluent local services connect connector load NetezzaSinkConnector --config NetezzaSinkConnector.properties

Confirm that the connector is in a RUNNING state.

confluent local services connect connector status NetezzaSinkConnector

REST-based example

This configuration is typically used for distributed workers. See the Kafka Connect REST API for details REST API information.

  1. Write the following JSON sample code to connector.json and set all of the required parameters.

    {
      "name": "NetezzaSinkConnector",
      "config":{
            "connector.class": "io.confluent.connect.netezza.NetezzaSinkConnector",
            "tasks.max": "1",
            "topics": "orders",
            "connection.host": "192.168.24.74",
            "connection.port": "5480",
            "connection.database": "SYSTEM",
            "connection.user": "admin",
            "connection.password": "password",
            "batch.size": "10000",
            "auto.create": "true"
            }
    }
    
  2. Use curl to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

    curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
    
    curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/NetezzaSinkConnector/config
    
  3. To verify the data in Netezza, log in to Netezza and connect to the Netezza database with the following command:

    [nz@netezza ~]$nzsql
    

    Tip

    The following are several important Netezza commands: \l displays all databases present with associated users. \dt displays all tables in the present database. \dt and \dv lists tables or views. \d describes a table or view.

  4. Run the following SQL query to verify the records:

    SYSTEM.ADMIN(ADMIN)=> select * from orders;
     foo|50.0|100|999
    

Troubleshooting

SPU virtual machine and NPS will not start

The following error message may appear when you run a Netezza emulator with VMware Fusion:

You need to enable 'Virtualize Intel VT-x/EPT or AMD-V/RVI' option in virtual
machine processor settings in VMware. Otherwise, SPU virtual machine and NPS
will not start.

Complete the steps below to resolve this issue:

  1. Go to VMware Fusion > Virtual Machine > Settings > Processors & Memory > Advanced Options.
  2. Turn on Enable hypervisor applications in this virtual machine.