Vertica Sink Connector for Confluent Platform¶
You can use the Kafka Connect Vertica Sink connector to export data from Apache Kafka® topics to Vertica. The Vertica Sink connector periodically polls records from Kafka and adds them to a Vertica table.
Important
The Vertica Sink connector is only compatible with Vertica 9.0.1 and later, and does not support update functionality at this time.
Features¶
The Vertica Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Vertica Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Auto-creation and auto-evoluton¶
Tip
Ensure the Vertica user has the appropriate permissions for DDL. For more information see Database Users and Privileges.
If auto.create
is enabled, the connector can create the destination table if
it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition. The connector creates a table without adding a primary
key or primary key constraints. However, if auto.create
is disabled and a
table is not present in the database, then the connector task fails with an
error stating that auto.create
is disabled.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER
on the destination table when it encounters a record for
which a column is found to be missing. If auto.evolve
is disabled then no
evolution is performed and the connector task fails with a “missing columns”
error. Because data type changes and removal of columns can be dangerous, the
connector does not attempt to perform such evolutions on the table. Also, the
connector does not attempt to add primary key constraints.
For both auto-creation and auto-evolution, if the corresponding field for the
column in the schema is optional, then there must be a default value in the
schema (not null
).
Important
For backward-compatible table schema evolution, missing fields in a record
must have a default value in the table. If no default value is found in the
table for the particular missing field, the record is rejected. The
connector rejects records which have null
value fields.
Schema Type | Vertica |
---|---|
INT8 | INT |
INT16 | INT |
INT32 | INT |
INT64 | INT |
FLOAT32 | FLOAT |
FLOAT64 | FLOAT |
BOOLEAN | BOOLEAN |
STRING | VARCHAR(1024) |
BYTES | VARBINARY(1024) |
Decimal | DECIMAL |
Date | DATE |
Time | TIME |
Timestamp | TIMESTAMP |
Install the Vertica connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java 8+. Note that Java 8 is deprecated in versions 7.2 and later of Confluent Platform. For more details, view Java compatibility with Confluent Platform by version.
You must install the connector on every machine where Connect will run.
Install the latest (
latest
) connector version.Download and install the vertica-jdbc.jar and place it in the connector’s
/lib
folder as shown in the following example:<path-to-connector>/confluentinc-kafka-connect-vertica-1.2.2/lib/vertica-jdbc-9.2.1-0.jar
Install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
Install the connector using the Confluent CLI¶
To install the latest
connector version, navigate to your Confluent Platform installation
directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-vertica:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-vertica:1.2.2
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license and for information about the license topic, see License topic configuration.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Vertica Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick start¶
In this quick start, the Vertica connector exports data produced by the Avro console producer to a Vertica database.
Before you begin, start the Vertica database and manually create a
table using the same name as the Kafka topic. Use the same schema that is used
for the data in the Kafka topic or add auto.create=true
.
Set up Vertica¶
To manually set up Vertica, complete the following steps:
Pull the Vertica image from Docker Registry and run it with a persistent datastore.
docker pull dataplatform/docker-vertica docker run -p 5433:5433 -d -v /data/vertica/vertica_data:/home/dbadmin/docker dataplatform/docker-vertica
Get the Docker image ID and launch a bash shell within the container.
docker ps docker exec -it <image_id> bash
Launch the Vertica console.
cd /opt/vertica/bin ./vsql -hlocalhost -Udbadmin
Create a table and insert data.
create table mytable(f1 varchar(20));
Start Confluent services¶
Use the following steps to start Confluent services.
Start the services using the Confluent CLI.
confluent local services start
Every service starts in order, printing a message with its status.
Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP] Starting KSQL Server KSQL Server is [UP] Starting Control Center Control Center is [UP]
Import a few records with a simple schema in Kafka by starting the Avro console producer as follows:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic mytable \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
In the console producer, enter the following:
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}
The three records entered are published to the Kafka topic
mytable
in Avro format.
Property-based example¶
Create a configuration file for the connector. This file is included with the
connector in etc/kafka-connect-vertica/vertica-sink-connector.properties
.
This configuration is used typically along with standalone
workers.
name=VerticaSinkConnector
tasks.max=1
topics=mytable
connector.class=io.confluent.vertica.VerticaSinkConnector
vertica.database=docker
vertica.host=127.0.0.1
vertica.port=5433
vertica.username=dbadmin
vertica.password=<password>
auto.create=true
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Start the Vertica connector by loading its configuration with the following command:
confluent local services connect connector load VerticaSinkConnector --config vertica-sink-connector.properties
{
"name" : "VerticaSinkConnector",
"config" : {
"tasks.max":"1",
"topics":"mytable",
"connector.class":"io.confluent.vertica.VerticaSinkConnector",
"vertica.database":"docker",
"vertica.host":"127.0.0.1",
"vertica.port":"5433",
"vertica.username":"dbadmin",
"vertica.password":"",
"auto.create":"true",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
},
"tasks": []
}
REST-based example¶
Use this setting with distributed workers.
Write the following JSON to
config.json
and configure all of the required values.{ "name" : "VerticaSinkConnector", "config" : { "tasks.max":"1", "topics":"mytable", "connector.class":"io.confluent.vertica.VerticaSinkConnector", "vertica.database":"docker", "vertica.host":"127.0.0.1", "vertica.port":"5433", "vertica.username":"dbadmin", "vertica.password":"", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor":"1" }, "tasks": [] }
Use
curl
to post the configuration to one of the Connect workers. Changehttp://localhost:8083/
the endpoint of one of your Connect workers. For more information about the REST APT, see REST API.curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/VerticaSinkConnector/config
Verify the connector started successfully and review the Connect worker’s log with the following command:
confluent local services connect log
At the end of the log you should see that the connector starts, logs a few messages, and then adds data from Kafka to the Vertica table.
Once the connector has ingested records, check whether the data is available in the Vertica table by running the following command in the Vertica console:
select * from mytable; f1 -------- value1 value2 value3 (3 rows)
Stop the Connect worker and all other Confluent services.
confluent local stop
Your output should resemble the following:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka REST Kafka REST is [DOWN] Stopping Schema Registry Schema Registry is [DOWN] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN]
Stop all services and remove any data generated during this quick start by entering the following command:
confluent local destroy
Your output should resemble the following:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka REST Kafka REST is [DOWN] Stopping Schema Registry Schema Registry is [DOWN] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN] Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE