OmniSci Sink Connector for Confluent Platform¶
Kafka Connect OmniSci Sink Connector を使用することにより、Apache Kafka® のトピックから OmniSci にデータをエクスポートすることができます。コネクターによって Kafka からデータがポーリングされて、トピックのサブスクリプションに基づいて OmniSci に書き込まれます。
機能¶
OmniSci Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
OmniSci Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
制限¶
- このコネクターでは、OmniSci へのデータの挿入のみできます。アップデートはサポートされていません。
auto.create
が有効の場合、フィールドのデフォルト値は無視されます。これは、OmniSci では列にデフォルト値を使用できないからです。auto.evolve
が有効の場合、コネクターに追加できるのは、省略可能とマークされているフィールドの新しい列のみです。必須フィールドは、デフォルト値があるとしてもサポートされません。- フィールドの削除はサポートされていません。以前は省略可能だったフィールドも削除できません。フィールドを削除する必要がある場合は、対応する OmniSci テーブルから列を手動で削除します。
- このコネクターでは、既存の列の型を変更できません。
OmniSci コネクターのインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-omnisci:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-omnisci:1.0.2
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、30 日間の試用期間の間は、ライセンスキーなしで使用できます。
30 日間経過後は、 Confluent エンタープライズライセンス を取得することで、このコネクターを利用できます。Confluent では、契約者に Confluent エンタープライズライセンス キーを発行します。併せて、Confluent Platform とコネクターに関する エンタープライズレベルのサポート を提供します。既にご契約の場合は、詳細について Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「Confluent Platform ライセンス」を参照してください。ライセンスのトピックの詳細については、「ライセンストピックの構成」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「OmniSci Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
クイックスタート¶
このクイックスタートでは、単一のトピックから Docker で実行されているローカル OmniSci データベースへ Avro データをコピーします。
この例では、Kafka および スキーマレジストリ がデフォルトのポートでローカルに実行されていることを前提としています。また、Docker がインストールされ実行されていることも前提としています。
まず、以下の Docker コマンドを実行して、OmniSci データベースを開始します。
docker run -d -p 6274:6274 omnisci/core-os-cpu:v4.7.0
CPU ベースのコミュニティバージョンの OmniSci が起動され、localhost のポート 6274 にマップされます。デフォルトで、ユーザー名は admin
、パスワードは HyperInteractive
です。デフォルトのデータベースは omnisci
です。
以下の Confluent CLI コマンドを使用して Confluent Platform を開始します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文は、5.3.0 で変更されています。該当するコマンドは confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。
confluent local services start
プロパティベースの例¶
次に、コネクターの構成ファイルを作成します。この構成は、通常、 スタンドアロンワーカー で使用されます。このファイルは、コネクターの ./etc/kafka-connect-omnisci/omnisci-sink-connector.properties
に含まれ、以下の設定があります。
name=OmnisciSinkConnector
connector.class=io.confluent.connect.omnisci.OmnisciSinkConnector
tasks.max=1
topics=orders
connection.database=omnisci
connection.port=6274
connection.host=localhost
connection.user=admin
connection.password=HyperInteractive
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
auto.create=true
この例のようにシンクコネクターに固有のトピックを除き、最初のいくつかの設定は、すべてのコネクターに対して指定する共通の設定です。
connection.url
、connection.user
、connection.password
には、ローカル OmniSci データベースの接続 URL、ユーザー名、パスワードを指定します。auto.create
が有効であることから、テーブルが存在しない場合はコネクターによってテーブルが作成されます。
この構成でコネクターを実行します。
confluent load OmnisciSinkConnector -d etc/kafka-connect-omnisci/omnisci-sink-connector.properties
REST ベースの例¶
この構成は通常、 分散ワーカー と併せて使用します。以下の json を omnisci-sink-connector.json
に書き込み、すべての必要な値を構成し、以下のコマンドを使用して構成を分散された Connect ワーカーのいずれかにポストします。Kafka Connect REST API の詳細については、こちらを参照してください。
{
"name" : "OmnisciSinkConnector",
"config" : {
"connector.class" : "io.confluent.connect.omnisci.OmnisciSinkConnector",
"tasks.max" : "1",
"topics": "orders",
"connection.database": "omnisci",
"connection.port": "6274",
"connection.host": "localhost",
"connection.user": "admin",
"connection.password": "HyperInteractive",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"auto.create": "true"
}
}
いずれかの Kafka Connect ワーカーに構成をポストするには、curl を使用します。"http://localhost:8083/" は、Kafka Connect ワーカーのいずれかのエンドポイントに変更してください。
この構成でコネクターを実行します。
curl -X POST -d @omnisci-sink-connector.json http://localhost:8083/connectors -H "Content-Type: application/json"
次に、orders
トピックにレコードを作成します。
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'
コンソールプロデューサーは、入力を待機しています。以下のレコードをターミナルにコピーアンドペーストします。
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
OmniSci のデータを検証するには、以下のコマンドを使用して Docker コンテナーにログインします。
docker exec -it <containerid> bash
ちなみに
コンテナー ID を取得するには、以下のコマンドを実行します。
docker ps
Docker コンテナーに入ったら、omnisql を起動します。
bin/omnisql
パスワードを求められたら、HyperInteractive
と入力します。
最後に、以下の SQL クエリを実行して、レコードを検証します。
omnisql> select * from orders;
foo|50.0|100|999
サポートされる OmniSci バージョン¶
OmniSci バージョン 4.5.0 以降がサポートされます。
データマッピング¶
シンクコネクターは、スキーマを認識していないので、スキーマレジストリ に付属の Avro コンバーター、またはスキーマが有効になっている JSON コンバーターなどの適切なコンバーターを使用する必要があります。Kafka レコードキーがある場合は、プリミティブ型、または Connect 構造体にすることができます。レコード値は Connect 構造体にする必要があります。Connect 構造体から選択されるフィールドは、プリミティブ型にする必要があります。トピックのデータが互換性のあるフォーマットでない場合は、カスタムの Converter
の実装が必要になることがあります。
自動作成と自動進化¶
ちなみに
DDL に対する適切なアクセス許可が OmniSci ユーザーにあることを確認してください。
auto.create
を有効にすると、送信先テーブルが存在しない場合に、コネクターでテーブルを CREATE
することができます。コネクターはテーブル定義のベースとしてレコードスキーマを使用するので、作成は、レコードをトピックから消費しながらオンラインで実行されます。
OmniSci では、列でのデフォルト値の使用がサポートされていません。スキーマにデフォルト値のあるフィールドが存在する場合、該当するフィールドは追加されますが、デフォルト値は無視されます。
auto.evolve
が有効である場合、列が存在しないと判明したレコードが見つかったときに、コネクターによって、送信先テーブルで ALTER
を発行して、制限付きの自動進化を実行できます。
データ型の変更と列の削除は障害につながる可能性があるため、その種の進化をコネクターがテーブルに対して実行しようとすることはありません。プライマリキー制約の追加も試行されません。
重要
テーブルスキーマ進化の後方互換性を維持するため、レコードスキーマの新規フィールドは、省略可能にする必要があります。デフォルト値の有無にかかわらず、必須フィールドはサポートされません。フィールドを削除する必要がある場合は、テーブルのスキーマを手動で変更して、対応する列をドロップしてください。列を Null 許容とマークしても機能しません。対応する列の ドロップが必要 です。