Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Vertica Sink Connector for Confluent Platform¶
You can use the Kafka Connect Vertica Sink connector to export data from Apache Kafka® topics to Vertica. The Vertica Sink connector periodically polls records from Kafka and adds them to a Vertica table.
Note
This connector is compatible with Vertica 9.0.1 and above.
Prerequisites¶
The following are required to run the Kafka Connect Vertica Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
Install the Vertica Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-vertica:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-vertica:1.0.0-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic ACLs for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Vertica Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Features¶
Auto-creation and Auto-evoluton¶
Tip
Make sure the Vertica user has the appropriate permissions for DDL. For more information see Database Users and Privileges.
If auto.create
is enabled, the connector can CREATE
the destination table if it is found to be missing.
The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.
The connector creates a table without adding a primary key or primary key constraints. However, if auto.create
is disabled and a table is not present in the database, then the connector task fails with an error stating that auto.create
is disabled.
If auto.evolve
is enabled, the connector can perform limited auto-evolution by issuing ALTER
on the destination table when it encounters a record for which a column is found to be missing. If auto.evolve
is disabled then no evolution is performed and the connector task fails with a “missing columns” error.
Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Also, the connector does not attempt to add primary key constraints.
For both auto-creation and auto-evolution, if the corresponding field for the column in the schema is optional, then there must be a default value in the schema (not null
).
Important
For backward-compatible table schema evolution, missing fields in a record must have a default value in the table. If no default value is found in the table for the particular missing field, the record is rejected.
Note
The connector rejects records which have null
value fields.
Schema Type | Vertica |
---|---|
INT8 | INT |
INT16 | INT |
INT32 | INT |
INT64 | INT |
FLOAT32 | FLOAT |
FLOAT64 | FLOAT |
BOOLEAN | BOOLEAN |
STRING | VARCHAR(1024) |
BYTES | VARBINARY(1024) |
Decimal | DECIMAL |
Date | DATE |
Time | TIME |
Timestamp | TIMESTAMP |
Quick Start¶
In this quick start, the Vertica Connector is used to export data produced by the Avro console producer to a Vertica database.
Note
Before you begin, start the Vertica database and manually create a table using the same name as the Kafka topic. Use the same schema as is used for the data in the Kafka topic or add auto.create=true
.
Set up Vertica¶
Use the following commands to manually set up Vertica.
Pull the Vertica image from Docker Registry and run it with a persistent datastore.
docker pull dataplatform/docker-vertica docker run -p 5433:5433 -d -v /data/vertica/vertica_data:/home/dbadmin/docker dataplatform/docker-vertica
Get the Docker image ID and launch a bash shell within the container.
docker ps docker exec -it <image_id> bash
Launch the Vertica console.
cd /opt/vertica/bin ./vsql -hlocalhost -Udbadmin
Create the table and insert data.
create table mytable(f1 varchar(20));
Start Confluent¶
Start the services using the Confluent CLI.
confluent local start
Every service starts in order, printing a message with its status.
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
To import a few records with a simple schema in Kafka, start the Avro console producer as follows:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic mytable \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then, in the console producer, enter the following:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the Kafka topic mytable
in Avro format.
Property-based example¶
Create a configuration file for the connector.
This file is included with the connector in etc/kafka-connect-vertica/vertica-sink-connector.properties
.
This configuration is used typically along with standalone workers.
name=VerticaSinkConnector
tasks.max=1
topics=mytable
connector.class=io.confluent.vertica.VerticaSinkConnector
vertica.database=docker
vertica.host=127.0.0.1
vertica.port=5433
vertica.username=dbadmin
vertica.password=<password>
auto.create=true
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Start the Vertica connector by loading its configuration with the following command:
Caution
You must include a double dash (--
) between the connector name and your flag. For more information,
see this post.
confluent local load VerticaSinkConnector -- -d vertica-sink-connector.properties
{
"name" : "VerticaSinkConnector",
"config" : {
"tasks.max":"1",
"topics":"mytable",
"connector.class":"io.confluent.vertica.VerticaSinkConnector",
"vertica.database":"docker",
"vertica.host":"127.0.0.1",
"vertica.port":"5433",
"vertica.username":"dbadmin",
"vertica.password":"",
"auto.create":"true",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
},
"tasks": []
}
REST-based example¶
Use this setting with distributed workers. Write
the following JSON to config.json
, configure all of the required values,
and use the following command to post the configuration to one of the
distributed connect workers. Check here for more information about the
Connect REST API
{
"name" : "VerticaSinkConnector",
"config" : {
"tasks.max":"1",
"topics":"mytable",
"connector.class":"io.confluent.vertica.VerticaSinkConnector",
"vertica.database":"docker",
"vertica.host":"127.0.0.1",
"vertica.port":"5433",
"vertica.username":"dbadmin",
"vertica.password":"",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
},
"tasks": []
}
Use curl
to post the configuration to one of the Connect workers. Change http://localhost:8083/
the endpoint of one of your Connect workers.
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/VerticaSinkConnector/config
Check that the connector started successfully. Review the Connect worker’s log with the following command:
confluent local log connect
Toward the end of the log you should see that the connector starts, logs a few messages, and then adds data from Kafka to the Vertica table.
Once the connector has ingested records, check whether the data is available in the Vertica table by running the following command in the Vertica console:
select * from mytable;
f1
--------
value1
value2
value3
(3 rows)
Finally, stop the Connect worker and all other Confluent services.
confluent local stop
Your output should resemble the following:
Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
You can stop all services and remove any data generated during this quick start by entering the following command:
confluent local destroy
Your output should resemble the following:
Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE