Kafka Connect OmniSci Sink Connector

The OmniSci sink connector allows you to export data from Apache Kafka® topics to OmniSci. The connector polls data from Kafka to write to OmniSci based on the topics subscription.

Limitations

  • This connector can only insert data into OmniSci. Updates are not supported.
  • If auto.create is enabled, the default values for fields are ignored. This is because OmniSci does not allow default values for columns.
  • If auto.evolve is enabled, the connector can only add new columns for fields that are marked optional. Mandatory fields are not supported, even if they have default values.
  • Deletion of fields is not supported. You cannot even delete a field that was previously optional. If you must delete fields, you will have to manually delete the column from the corresponding OmniSci table.
  • This connector can not alter the type of an existing column.

Install OmniSci Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-omnisci:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-omnisci:1.0.2

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Quick Start

In this quickstart, you copy Avro data from a single topic to a local OmniSci database running on Docker.

This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes your have Docker installed and running.

First, bring up OmniSci database by running the following Docker command:

docker run -d -p 6274:6274 omnisci/core-os-cpu:v4.7.0

This brings up the CPU based community version of OmniSci, and maps it to port 6274 on localhost. By default, the user name is admin and the password is HyperInteractive. The default database is omnisci.

Start the Confluent Platform using the Confluent CLI command below.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

confluent local start

Property-based example

Next, create a configuration file for the connector. This configuration is used typically along with standalone workers. This file is included with the connector in ./etc/kafka-connect-omnisci/omnisci-sink-connector.properties and contains the following settings:

name=OmnisciSinkConnector
connector.class=io.confluent.connect.omnisci.OmnisciSinkConnector
tasks.max=1
topics=orders
connection.database=omnisci
connection.port=6274
connection.host=localhost
connection.user=admin
connection.password=HyperInteractive
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
auto.create=true

The first few settings are common settings you specify for all connectors, except for topics which are specific to sink connectors like this one.

The connection.url, connection.user and connection.password specify the connection URL, username, and password of the local OmniSci database. Since auto.create is enabled, the connector creates the table if it is not present.

Run the connector with this configuration.

confluent load OmnisciSinkConnector -d etc/kafka-connect-omnisci/omnisci-sink-connector.properties

REST-based example

This configuration is used typically along with distributed workers. Write the following json to omnisci-sink-connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect REST API

    {
      "name" : "OmnisciSinkConnector",
      "config" : {
            "connector.class" : "io.confluent.connect.omnisci.OmnisciSinkConnector",
            "tasks.max" : "1",
            "topics": "orders",
            "connection.database": "omnisci",
            "connection.port": "6274",
            "connection.host": "localhost",
            "connection.user": "admin",
            "connection.password": "HyperInteractive",
            "confluent.topic.bootstrap.servers": "localhost:9092",
            "confluent.topic.replication.factor": "1",
            "auto.create": "true"
  }
}

Use curl to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

Run the connector with this configuration.

curl -X POST -d @omnisci-sink-connector.json http://localhost:8083/connectors -H "Content-Type: application/json"

Next, create a record in the orders topic

$ bin/kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
 "type": "float"}]}'

The console producer is waiting for input. Copy and paste the following record into the terminal:

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

To verify the data in OmniSci, log in to the Docker container using the following command:

docker exec -it <containerid> bash

Tip

To find the container id, you can run the following command

docker ps

Once you are inside the Docker container, launch omnisql:

bin/omnisql

When prompted for a password, enter HyperInteractive.

Finally, run the following SQL query to verify the records:

omnisql> select * from orders;
    foo|50.0|100|999

OmniSci Supported Versions

OmniSci versions 4.5.0 and above are supported.

Data mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter like the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled. Kafka record keys (if present) can be primitive types or a Connect struct. The record value must be a Connect struct. Fields being selected from Connect structs must be primitive types. If the data in the topic is not of a compatible format, implementing a custom Converter may be necessary.

Auto-creation and Auto-evolution

Tip

Make sure the OmniSci user has the appropriate permissions for DDL.

If auto.create is enabled, the connector can CREATE the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.

Note that OmniSci does not support default values for columns. If your schema has fields with default values, they are added but the default value is ignored.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing.

Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Addition of primary key constraints is also not attempted.

Important

For backward-compatible table schema evolution, new fields in record schemas must be optional. Mandatory fields, with or without a default value, are NOT supported. If you need to delete a field, the table schema should be manually altered to drop the corresponding column. Marking the column nullable does not work. You must drop the corresponding column.

Additional Documentation