Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Netezza Sink Connector for Confluent Platform¶
The Kafka Connect Netezza sink connector allows you to export data from Apache Kafka® topics to Netezza. The connector polls data from Kafka to write to Netezza based on the topics subscription.
Prerequisites¶
The following are required to run the Kafka Connect Netezza Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Netezza 7.2.1 or above (tested with Netezza emulator version 7.2.1 running on VMware-Player-15).
- Manually installed Netezza JDBC driver 1.0 or above (see Install the Netezza JDBC driver)
- Topics containing records with schema information (see Data mapping)
Limitations¶
- Inserts in Netezza happen at a very slow rate. Confluent recommends using a batch size of 10,000.
- This connector can only insert data into Netezza. Updates are not supported.
- If
auto.create
is enabled, the default values for fields are ignored. This is because Netezza does not allow default values for columns. - If
auto.evolve
is enabled, the connector can only add new columns for fields that are marked optional. Mandatory fields are not supported, even if they have default values. - Deleting fields is not supported; this includes fields that were previously optional. If you need to delete fields, you must manually delete the column from the corresponding Netezza table.
Installing the Netezza connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-netezza:1.0.0
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-netezza:1.0.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Install the Netezza JDBC driver¶
The Kafka Connect Netezza connector does not come with the Netezza JDBC driver. If you are running a multi-node Connect cluster, the Netezza connector and Netezza JDBC driver JAR (distributed by IBM) must be installed on every Connect worker in the cluster.
Install the nzjdbc-1.0.jar file on every Connect worker in the cluster where the connector is installed:
Download the Netezza JDBC driver from the Maven artifact repository.
After downloading the driver, copy the
nzjdbc-1.0.jar
file into theshare/java/kafka-connect-netezza
directory of your Confluent Platform installation on every worker node.Restart all of the Connect worker nodes.
Note
The
share/java/kafka-connect-netezza
directory mentioned above is for Confluent Platform. If you are using a different installation, find the location of the Confluent Netezza connector JAR files and place the nzjdbc-1.0.jar file into the same directory.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Netezza Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Setting Up Netezza¶
The Netezza JDBC sink connector attempts to create or alter the database schema if the tables do not exist or do not have a schema that reflects the structure of the records. For the reason, you should set up a Netezza database user with permissions to execute DDL. Use this Netezza database user in the connector.
Features¶
The Netezza connector offers the following features:
Data mapping¶
The Netezza sink connector requires knowledge of schemas, so you should use a suitable converter (for example: the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled). Kafka record keys (if present) can be primitive types or a Connect struct. The record value must be a Connect struct. Fields being selected from Connect structs must be primitive types. If the data in the topic is not in a compatible format implementing a custom converter may be necessary.
Auto-creation and Auto-evolution¶
If auto.create
is enabled, the connector can CREATE the destination table
if it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition.
Note
Netezza does not support default values for columns. If your schema has fields with default values, they will be added, but the default value will be ignored.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER on the destination table when it encounters a record for with a missing column.
Since changes to data-types and removing columns can be dangerous, the connector does not attempt to perform these evolutions on the table. The connector also does not attempt to add primary key constraints.
Important
- For backwards-compatible table schema evolution, new fields in record schemas must be optional. Mandatory fields with a default value are not supported.
- If you need to delete a field, the table schema should be manually altered to drop the corresponding column. Marking the column nullable does not work. You must drop the corresponding column.
Quick Start¶
This quick start uses the Netezza JDBC sink connector to copy Avro data from a single Kafka topic, in a locally running broker, to a Netezza database in an emulator (also running locally).
You first install the Netezza emulator version 7.2.1 (or later) running on VMware-Player-15 (or later), then start Confluent Platform locally, and then run the Netezza sink connector.
Install VMware-Player-15 and the Netezza emulator¶
Use the provided links to download VMware-Player-15 and the Netezza Emulator.
Complete the following steps install VMware-Player-15:
Change the execution permission on the downloaded VMware-Player file.
chmod +x ~/Downloads/VMware-Player
Start the VMware-Player installation.
sudo sh ~/Downloads/VMware-Player*
Proceed with the VMware-Player installation steps by clicking on next.
Complete the following steps to install and run the Netezza emulator:
- Open VMware player and click on Open a Virtual machine. A pop-up message directs you to select the
.ova
file you downloaded. - Select the file and click on the import button. VMware imports and creates a new virtual machine from emulator file.
- Change the virtual machine memory settings and set it to minimum 2GB.
- Start the virtual machine and log in to Netezza appliances with default credentials. This launches the CPU-based community version of Netezza and maps it to port 5480 on your localhost. By default, the user name is
admin
and the password ispassword
. The default database isSYSTEM
.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local start
. For more information, see confluent local.
confluent local start
Important
Don not use the Confluent CLI in production environments.
Produce records¶
Create a record in the orders
topic.
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'
The console producer will wait for input. Copy and paste the following record into the terminal:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Property-based example¶
Create a configuration file for the connector. This file is included with
the connector in ./etc/kafka-connect-netezza/NetezzaSinkConnector.properties
and contains the following settings:
name=NetezzaSinkConnector
connector.class=io.confluent.connect.netezza.NetezzaSinkConnector
tasks.max=1
topics=orders
connection.host=192.168.24.74
connection.port=5480
connection.database=SYSTEM
connection.user=admin
connection.password=password
batch.size=10000
auto.create=true
The first few parameters are common settings that you specify for all
connectors, with the exception of topics
which is specific to sink
connectors like this one.
The connection.host
, connection.port
, connection.database
,
connection.user
and connection.password
specifies the connection URL,
username, and password of the local Netezza database. Since auto.create
is
enabled, the connector creates the table if it is not present. Batch size is set
to batch.size=10000
(the default value). Even though batch.size
is set
to the default value, it is included in the configuration example for clarity.
Run the connector with this configuration.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local load NetezzaSinkConnector -- -d NetezzaSinkConnector.properties
Confirm that the connector is in a RUNNING
state.
confluent local status NetezzaSinkConnector
REST-based example¶
This configuration is typically used for distributed workers. See the Kafka Connect REST API for details REST API information.
Write the following JSON sample code to
connector.json
and set all of the required parameters.{ "name": "NetezzaSinkConnector", "config":{ "connector.class": "io.confluent.connect.netezza.NetezzaSinkConnector", "tasks.max": "1", "topics": "orders", "connection.host": "192.168.24.74", "connection.port": "5480", "connection.database": "SYSTEM", "connection.user": "admin", "connection.password": "password", "batch.size": "10000", "auto.create": "true" } }
Use curl to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
the endpoint of one of your Kafka Connect worker(s).curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/NetezzaSinkConnector/config
To verify the data in Netezza, log in to Netezza and connect to the Netezza database with the following command:
[nz@netezza ~]$nzsql
Tip
The following are several important Netezza commands:
\l
displays all databases present with associated users.\dt
displays all tables in the present database.\dt
and\dv
lists tables or views.\d
describes a table or view.Run the following SQL query to verify the records:
SYSTEM.ADMIN(ADMIN)=> select * from orders; foo|50.0|100|999
Troubleshooting¶
SPU virtual machine and NPS will not start¶
The following error message may appear when you run a Netezza emulator with VMware Fusion:
You need to enable 'Virtualize Intel VT-x/EPT or AMD-V/RVI' option in
virtual machine processor settings in VMware. Otherwise, SPU virtual machine
and NPS will not start
Complete the steps below to resolve this issue:
- Go to VMware Fusion > Virtual Machine > Settings > Processors & Memory > Advanced Options.
- Turn on
Enable hypervisor applications in this virtual machine.