Vertica Sink Connector for Confluent Platform¶
重要
Vertica Sink Connector には、Vertica 9.0.1 以降のみと互換性があります。また、現時点では、アップデート機能がサポートされていません。
Kafka Connect Vertica Sink Connector は、データを Apache Kafka® のトピックから Vertica にエクスポートするために使用されます。Vertica Sink Connector では、Kafka からのレコードを定期的にポーリングし、Vertica テーブルに追加します。
機能¶
Vertica Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
Vertica Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
自動作成と自動進化¶
ちなみに
DDL に対する適切なアクセス許可が Vertica ユーザーにあることを確認してください。詳細については、『Database Users and Privileges 』を参照してください。
auto.create
を有効にすると、送信先テーブルが存在しない場合に、コネクターでテーブルを作成することができます。コネクターはテーブル定義のベースとしてレコードスキーマを使用するので、作成は、レコードをトピックから取り込みながらオンラインで実行されます。コネクターでは、プライマリキーやプライマリキー制約の追加なしでテーブルが作成されます。ただし、auto.create
が無効で、データベースにテーブルがない場合、コネクタータスクは auto.create
が無効というエラーで失敗します。
auto.evolve
が有効である場合、列が存在しないと判明したレコードが見つかったときに、コネクターによって、送信先テーブルで ALTER
を発行して、制限付きの自動進化を実行できます。auto.evolve
を無効にすると、進化は実行されず、コネクタータスクは "列が存在しない" というエラーで失敗します。データ型の変更と列の削除は障害につながる可能性があるため、そのような進化をコネクターがテーブルに対して実行しようとすることはありません。また、コネクターによってプライマリキー制約の追加が試行されることもありません。
自動作成および自動進化のどちらについても、スキーマ内の列の対応するフィールドが省略可能の場合、スキーマには( null
ではない)デフォルト値が必要です。
重要
テーブルスキーマ進化の後方互換性を維持するため、レコードに存在しないフィールドにはテーブルにデフォルト値があることが必要です。存在しないフィールドに対し、テーブルにデフォルト値がなかった場合、そのレコードは拒否されます。コネクターでは、null
値のフィールドを持つレコードは拒否されます。
スキーマの型 | Vertica |
---|---|
INT8 | INT |
INT16 | INT |
INT32 | INT |
INT64 | INT |
FLOAT32 | FLOAT |
FLOAT64 | FLOAT |
BOOLEAN | BOOLEAN |
STRING | VARCHAR(1024) |
BYTES | VARBINARY(1024) |
Decimal | DECIMAL |
Date | DATE |
Time | TIME |
Timestamp | TIMESTAMP |
前提条件¶
Kafka Connect Vertica Sink Connector を実行するには、以下が必要です。
- Kafka ブローカー : Confluent Platform 3.3.0 以上、または Kafka 0.11.0 以上
- Connect: Confluent Platform 4.0.0 以上、または Kafka 1.0.0 以上
- Java 1.8
Vertica コネクターのインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-vertica:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-vertica:1.0.0-preview
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、ライセンスキーがなくても 30 日間試用できます。
30 日間経過後は、 Confluent エンタープライズライセンス を取得することで、このコネクターを利用できます。Confluent では、契約者に Confluent エンタープライズライセンス キーを発行します。併せて、Confluent Platform とコネクターに関する エンタープライズレベルのサポート を提供します。既にご契約されている場合は、詳細について Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「 Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「 ライセンストピックの構成」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Vertica Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
クイックスタート¶
このクイックスタートでは、Vertica コネクターを使用して、Avro コンソールプロデューサーで生成されたデータを Vertica データベースにエクスポートします。
注釈
これらのステップを開始する前に、Vertica データベースを起動 し、 Kafka のトピックと同じ名前のテーブルを手動で作成します。Kafka のトピックのデータの場合と同じスキーマを使用するか、auto.create=true
を追加します。
Vertica のセットアップ¶
Vertica を手動でセットアップするには、以下のコマンドを使用します。
Vertica イメージを Docker レジストリからプルし、永続データストアを使用して実行します。
docker pull dataplatform/docker-vertica docker run -p 5433:5433 -d -v /data/vertica/vertica_data:/home/dbadmin/docker dataplatform/docker-vertica
Docker イメージ ID を取得し、bash シェルをコンテナー内部で起動します。
docker ps docker exec -it <image_id> bash
Vertica コンソールを起動します。
cd /opt/vertica/bin ./vsql -hlocalhost -Udbadmin
テーブルを作成し、データを挿入します。
create table mytable(f1 varchar(20));
Confluent の開始¶
Confluent CLI を使用してサービスを開始します。
confluent local services start
各サービスが順番に開始され、ステータスを含むメッセージが出力されます。
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
シンプルなスキーマを使用するレコードを何件か Kafka にインポートするには、以下のように Avro コンソールプロデューサーを起動します。
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic mytable \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
コンソールプロデューサーで、以下の内容を入力します。
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
入力された 3 件のレコードが、Kafka のトピック mytable
に Avro フォーマットでパブリッシュされます。
プロパティベースの例¶
コネクターの構成ファイルを作成します。このファイルは、コネクターの etc/kafka-connect-vertica/vertica-sink-connector.properties
に含まれています。この構成は、通常、 スタンドアロンワーカー で使用されます。
name=VerticaSinkConnector
tasks.max=1
topics=mytable
connector.class=io.confluent.vertica.VerticaSinkConnector
vertica.database=docker
vertica.host=127.0.0.1
vertica.port=5433
vertica.username=dbadmin
vertica.password=<password>
auto.create=true
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
以下のコマンドを使用して構成を読み込み、Vertica コネクターを起動します。
confluent local services connect connector load VerticaSinkConnector --config vertica-sink-connector.properties
{
"name" : "VerticaSinkConnector",
"config" : {
"tasks.max":"1",
"topics":"mytable",
"connector.class":"io.confluent.vertica.VerticaSinkConnector",
"vertica.database":"docker",
"vertica.host":"127.0.0.1",
"vertica.port":"5433",
"vertica.username":"dbadmin",
"vertica.password":"",
"auto.create":"true",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
},
"tasks": []
}
REST ベースの例¶
この設定は通常、 分散ワーカー で使用されます。以下の JSON を config.json
に書き込み、必要な値をすべて構成します。それから以下のコマンドを使用して、分散された Connect ワーカーのいずれかに構成をポストします。REST API の詳細については、こちらを参照してください。
{
"name" : "VerticaSinkConnector",
"config" : {
"tasks.max":"1",
"topics":"mytable",
"connector.class":"io.confluent.vertica.VerticaSinkConnector",
"vertica.database":"docker",
"vertica.host":"127.0.0.1",
"vertica.port":"5433",
"vertica.username":"dbadmin",
"vertica.password":"",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
},
"tasks": []
}
いずれかの Connect ワーカーに構成をポストするには、curl
を使用します。http://localhost:8083/
を、Connect ワーカーのいずれかのエンドポイントに変更します。
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
以下のコマンドを使用して、既存のコネクターの構成をアップデートします。
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/VerticaSinkConnector/config
コネクターが正常に起動されたことを確認します。以下のコマンドを使用して、Connect ワーカーのログを確認します。
confluent local services connect log
ログの最後の方で、コネクターが起動されたこと、ログにいくつかメッセージが記録されていること、データが Kafka から Vertica テーブルに追加されたことを確認できます。
コネクターによってレコードが取り込まれたら、Vertica コンソールで以下のコマンドを実行して、Vertica テーブルでデータを利用できるかどうかを確認します。
select * from mytable;
f1
--------
value1
value2
value3
(3 rows)
最後に、Connect ワーカーとその他すべての Confluent サービスを停止します。
confluent local stop
出力は以下のようになります。
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
サービスをすべて停止し、このクイックスタートで生成した全データを削除するには、以下のコマンドを入力します。
confluent local destroy
出力は以下のようになります。
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE