HEAVY-AI (Formerly OmniSci) Sink Connector for Confluent Platform¶
The Kafka Connect HEAVY-AI Sink connector allows you to export data from Apache Kafka® topics to HEAVY-AI. The connector polls data from Kafka to write to HEAVY-AI based on the topics subscription.
Features¶
The HEAVY-AI Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The HEAVY-AI Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Limitations¶
- This connector can only insert data into HEAVY-AI. Updates are not supported.
- If
auto.create
is enabled, the default values for fields are ignored. This is because HEAVY-AI does not allow default values for columns. - If
auto.evolve
is enabled, the connector can only add new columns for fields that are marked optional. Mandatory fields are not supported, even if they have default values. - Deletion of fields is not supported. You cannot even delete a field that was previously optional. If you must delete fields, manually delete the columns from the corresponding HEAVY-AI table.
- This connector can not alter the type of an existing column.
Install the HEAVY-AI Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-omnisci:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-omnisci:1.0.2
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for HEAVY-AI (Formerly OmniSci) Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick Start¶
In this quickstart, you copy Avro data from a single topic to a local HEAVY-AI database running on Docker.
This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes your have Docker installed and running.
First, bring up HEAVY-AI database by running the following Docker command:
docker run -d -p 6274:6274 omnisci/core-os-cpu:v4.7.0
This starts the CPU-based community version of HEAVY-AI, and maps it to port
6274 on localhost. By default, the user name is admin
and the password is
HyperInteractive
. The default database is omnisci
.
Start the Confluent Platform using the Confluent CLI command below.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Property-based example¶
Next, create a configuration file for the connector. This configuration is used
typically along with standalone
workers. This file is included with
the connector in
./etc/kafka-connect-omnisci/omnisci-sink-connector.properties
and contains
the following settings:
name=OmnisciSinkConnector
connector.class=io.confluent.connect.omnisci.OmnisciSinkConnector
tasks.max=1
topics=orders
connection.database=omnisci
connection.port=6274
connection.host=localhost
connection.user=admin
connection.password=HyperInteractive
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
auto.create=true
The first few settings are common settings you specify for all connectors, except for topics which are specific to sink connectors like this one.
The connection.url
, connection.user
and connection.password
specify
the connection URL, username, and password of the local HEAVY-AI database. Since
auto.create
is enabled, the connector creates the table if it is not
present.
Run the connector with this configuration.
confluent load OmnisciSinkConnector -d etc/kafka-connect-omnisci/omnisci-sink-connector.properties
REST-based example¶
This configuration is used typically along with distributed
workers. Write the following json to
omnisci-sink-connector.json
, configure all of the required values, and use
the command below to post the configuration to one the distributed connect
worker(s). Check here for more information about the Kafka Connect
REST API
{
"name" : "OmnisciSinkConnector",
"config" : {
"connector.class" : "io.confluent.connect.omnisci.OmnisciSinkConnector",
"tasks.max" : "1",
"topics": "orders",
"connection.database": "omnisci",
"connection.port": "6274",
"connection.host": "localhost",
"connection.user": "admin",
"connection.password": "HyperInteractive",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"auto.create": "true"
}
}
Use curl to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).
Run the connector with this configuration.
curl -X POST -d @omnisci-sink-connector.json http://localhost:8083/connectors -H "Content-Type: application/json"
Next, create a record in the orders
topic
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'
The console producer is waiting for input. Copy and paste the following record into the terminal:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
To verify the data in HEAVY-AI, log in to the Docker container using the following command:
docker exec -it <containerid> bash
Tip
To find the container id, you can run the following command
docker ps
Once you are inside the Docker container, launch omnisql:
bin/omnisql
When prompted for a password, enter HyperInteractive
.
Finally, run the following SQL query to verify the records:
omnisql> select * from orders;
foo|50.0|100|999
HEAVY-AI Supported Versions¶
HEAVY-AI versions 4.5.0 and above are supported.
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable
converter like the Avro converter that comes with Schema Registry or the JSON converter
with schemas enabled. Kafka record keys (if present) can be primitive types or a
Connect struct. The record value must be a Connect struct. Fields being
selected from Connect structs must be primitive types. If the data in the
topic is not of a compatible format, implementing a custom Converter
may be
necessary.
Auto-creation and Auto-evolution¶
Tip
Ensure the HEAVY-AI user has the appropriate permissions for DDL.
If auto.create
is enabled, the connector can CREATE
the destination
table if it is found to be missing. The creation takes place online with records
being consumed from the topic, since the connector uses the record schema as a
basis for the table definition.
Note that HEAVY-AI does not support default values for columns. If your schema has fields with default values, they are added but the default value is ignored.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER
on the destination table when it encounters a record for
which a column is found to be missing.
Since data type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Addition of primary key constraints is also not attempted.
Important
For backward-compatible table schema evolution, new fields in record schemas must be optional. Mandatory fields, with or without a default value, are NOT supported. If you need to delete a field, the table schema should be manually altered to drop the corresponding column. Marking the column nullable does not work. You must drop the corresponding column.