Netezza Sink Connector for Confluent Platform¶
The Kafka Connect Netezza Sink connector allows you to export data from Apache Kafka® topics to Netezza. The connector polls data from Kafka to write to Netezza based on the topics subscription.
Features¶
The Netezza Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Data mapping¶
The Netezza sink connector requires knowledge of schemas, so you should use a suitable converter (for example: the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled). Kafka record keys (if present) can be primitive types or a Connect struct. The record value must be a Connect struct. Fields being selected from Connect structs must be primitive types. If the data in the topic is not in a compatible format, implementing a custom converter may be necessary.
Auto-creation and auto-evolution¶
If auto.create
is enabled, the connector can create the destination table
if it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition.
Note
Netezza does not support default values for columns. If your schema has fields with default values, they will be added, but the default value will be ignored.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER on the destination table when it encounters a record for with
a missing column.
Because changes to data types and removing columns can be dangerous, the connector does not attempt to perform these evolutions on the table. The connector also does not attempt to add primary key constraints.
Important
- For backward-compatible table schema evolution, new fields in record schemas must be optional. Mandatory fields with a default value are not supported.
- If you need to delete a field, the table schema should be manually altered to drop the corresponding column. Marking the column nullable does not work. You must drop the corresponding column.
Limitations¶
- Inserts in Netezza happen at a very slow rate. Confluent recommends using a batch size of 10,000.
- This connector can only insert data into Netezza. Updates are not supported.
- If
auto.create
is enabled, the default values for fields are ignored. This is because Netezza does not allow default values for columns. - If
auto.evolve
is enabled, the connector can only add new columns for fields that are marked optional. Mandatory fields are not supported, even if they have default values. - Deleting fields is not supported; this includes fields that were previously optional. If you need to delete fields, you must manually delete the column from the corresponding Netezza table.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see:ref:netezza-sink-connector-license-config. For information about the license topic, see License topic configuration.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Netezza Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Netezza Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Netezza 7.2.1 or later (tested with Netezza emulator version 7.2.1 running on VMware-Player-15).
Manually installed Netezza JDBC driver 1.0 or later. For more details, see Install the Netezza JDBC driver).
Topics containing records with schema information. For more details, see Data mapping).
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-netezza:1.0.0
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-netezza:1.0.1
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Install the Netezza JDBC driver¶
The Kafka Connect Netezza connector does not come with the Netezza JDBC driver. If you are running a multi-node Connect cluster, the Netezza connector and Netezza JDBC driver JAR (distributed by IBM) must be installed on every Connect worker in the cluster.
Install the nzjdbc-1.0.jar file on every Connect worker in the cluster where the connector is installed:
Download the Netezza JDBC driver from the Maven artifact repository.
After downloading the driver, copy the
nzjdbc-1.0.jar
file into theshare/java/kafka-connect-netezza
directory of your Confluent Platform installation on every worker node.Restart all of the Connect worker nodes.
Note
The
share/java/kafka-connect-netezza
directory mentioned above is for Confluent Platform. If you are using a different installation, find the location of the Confluent Netezza connector JAR files and place the nzjdbc-1.0.jar file into the same directory.
Setting Up Netezza¶
The Netezza JDBC sink connector attempts to create or alter the database schema if the tables do not exist or do not have a schema that reflects the structure of the records. For the reason, you should set up a Netezza database user with permissions to execute DDL. Use this Netezza database user in the connector.
Quick Start¶
This quick start uses the Netezza JDBC sink connector to copy Avro data from a single Kafka topic, in a locally running broker, to a Netezza database in an emulator (also running locally).
You first install the Netezza emulator version 7.2.1 (or later) running on VMware-Player-15 (or later), then start Confluent Platform locally, and then run the Netezza sink connector.
Install VMware-Player-15 and the Netezza emulator¶
Use the provided links to download VMware-Player-15 and the Netezza Emulator.
Complete the following steps to install VMware-Player-15:
Change the execution permission on the downloaded VMware-Player file.
chmod +x ~/Downloads/VMware-Player
Start the VMware-Player installation.
sudo sh ~/Downloads/VMware-Player*
Proceed with the VMware-Player installation steps by clicking on next.
Complete the following steps to install and run the Netezza emulator:
- Open VMware player and click on Open a Virtual machine. A pop-up
message directs you to select the
.ova
file you downloaded. - Select the file and click on the import button. VMware imports and creates a new virtual machine from emulator file.
- Change the virtual machine memory settings and set it to minimum 2GB.
- Start the virtual machine and log in to Netezza appliances with default
credentials. This launches the CPU-based community version of Netezza and
maps it to port 5480 on your localhost. By default, the user name is
admin
and the password ispassword
. The default database isSYSTEM
.
- Open VMware player and click on Open a Virtual machine. A pop-up
message directs you to select the
Start Confluent¶
Start the Confluent services using the following Confluent CLI command.
confluent local services start
Important
Do not use the Confluent CLI in production environments.
Produce records¶
Create a record in the orders
topic.
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'
The console producer will wait for input. Copy and paste the following record into the terminal:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Property-based example¶
Create a configuration file for the connector. This file is included with the
connector in ./etc/kafka-connect-netezza/NetezzaSinkConnector.properties
and
contains the following settings:
name=NetezzaSinkConnector
connector.class=io.confluent.connect.netezza.NetezzaSinkConnector
tasks.max=1
topics=orders
connection.host=192.168.24.74
connection.port=5480
connection.database=SYSTEM
connection.user=admin
connection.password=password
batch.size=10000
auto.create=true
The first few parameters are common settings that you specify for all
connectors, with the exception of topics
which is specific to sink
connectors like this one.
The connection.host
, connection.port
, connection.database
,
connection.user
and connection.password
specifies the connection URL,
username, and password of the local Netezza database. Since auto.create
is
enabled, the connector creates the table if it is not present. Batch size is set
to batch.size=10000
(the default value). Even though batch.size
is set
to the default value, it is included in the configuration example for clarity.
Run the connector with this configuration.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load NetezzaSinkConnector --config NetezzaSinkConnector.properties
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status NetezzaSinkConnector
REST-based example¶
This configuration is typically used for distributed workers. See the Kafka Connect REST API for details REST API information.
Write the following JSON sample code to
connector.json
and set all of the required parameters.{ "name": "NetezzaSinkConnector", "config":{ "connector.class": "io.confluent.connect.netezza.NetezzaSinkConnector", "tasks.max": "1", "topics": "orders", "connection.host": "192.168.24.74", "connection.port": "5480", "connection.database": "SYSTEM", "connection.user": "admin", "connection.password": "password", "batch.size": "10000", "auto.create": "true" } }
Use curl to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
the endpoint of one of your Kafka Connect worker(s).curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/NetezzaSinkConnector/config
To verify the data in Netezza, log in to Netezza and connect to the Netezza database with the following command:
[nz@netezza ~]$nzsql
Tip
The following are several important Netezza commands:
\l
displays all databases present with associated users.\dt
displays all tables in the present database.\dt
and\dv
lists tables or views.\d
describes a table or view.Run the following SQL query to verify the records:
SYSTEM.ADMIN(ADMIN)=> select * from orders; foo|50.0|100|999
Troubleshooting¶
SPU virtual machine and NPS will not start¶
The following error message may appear when you run a Netezza emulator with VMware Fusion:
You need to enable 'Virtualize Intel VT-x/EPT or AMD-V/RVI' option in virtual
machine processor settings in VMware. Otherwise, SPU virtual machine and NPS
will not start.
Complete the steps below to resolve this issue:
- Go to VMware Fusion > Virtual Machine > Settings > Processors & Memory > Advanced Options.
- Turn on
Enable hypervisor applications in this virtual machine.