.. _vertica-sink-connector: Vertica Sink Connector for |cp| =============================== You can use the |kconnect-long| Vertica Sink connector to export data from |ak-tm| topics to Vertica. The Vertica Sink connector periodically polls records from |ak| and adds them to a Vertica table. .. note:: This connector is compatible with Vertica 9.0.1 and above. Prerequisites ------------- The following are required to run the |kconnect-long| Vertica Sink Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 4.0.0 or above, or |ak| 1.0.0 or above * Java 1.8 .. _connect_vertica_install: Install the Vertica Connector ----------------------------- .. include:: ../../includes/connector-install.rst .. include:: ../../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-vertica:latest .. include:: ../../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-vertica:1.0.0-preview ------------------------------ Install the connector manually ------------------------------ `Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `. .. _connect_vertica_license: License ------- .. include:: ../../includes/enterprise-license.rst See :ref:`vertica-connector-license-config` for license properties and :ref:`vertica-license-topic-configuration` for information about the license topic. Configuration Properties ------------------------ For a complete list of configuration properties for this connector, see :ref:`connect_vertica_sink_configuration_options`. .. include:: ../../includes/connect-to-cloud-note.rst Features -------- ------------------------------- Auto-creation and Auto-evoluton ------------------------------- .. tip:: Make sure the Vertica user has the appropriate permissions for DDL. For more information see `Database Users and Privileges `__. If ``auto.create`` is enabled, the connector can ``CREATE`` the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. The connector creates a table without adding a primary key or primary key constraints. However, if ``auto.create`` is disabled and a table is not present in the database, then the connector task fails with an error stating that ``auto.create`` is disabled. If ``auto.evolve`` is enabled, the connector can perform limited auto-evolution by issuing ``ALTER`` on the destination table when it encounters a record for which a column is found to be missing. If ``auto.evolve`` is disabled then no evolution is performed and the connector task fails with a "missing columns" error. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Also, the connector does not attempt to add primary key constraints. For both auto-creation and auto-evolution, if the corresponding field for the column in the schema is optional, then there must be a default value in the schema (not ``null``). .. important:: For backward-compatible table schema evolution, missing fields in a record must have a default value in the table. If no default value is found in the table for the particular missing field, the record is rejected. .. note:: The connector rejects records which have ``null`` value fields. +--------------------+--------------------+ | Schema Type | Vertica | +====================+====================+ | INT8 | INT | +--------------------+--------------------+ | INT16 | INT | +--------------------+--------------------+ | INT32 | INT | +--------------------+--------------------+ | INT64 | INT | +--------------------+--------------------+ | FLOAT32 | FLOAT | +--------------------+--------------------+ | FLOAT64 | FLOAT | +--------------------+--------------------+ | BOOLEAN | BOOLEAN | +--------------------+--------------------+ | STRING | VARCHAR(1024) | +--------------------+--------------------+ | BYTES | VARBINARY(1024) | +--------------------+--------------------+ | Decimal | DECIMAL | +--------------------+--------------------+ | Date | DATE | +--------------------+--------------------+ | Time | TIME | +--------------------+--------------------+ | Timestamp | TIMESTAMP | +--------------------+--------------------+ Quick Start ----------- In this quick start, the Vertica Connector is used to export data produced by the Avro console producer to a Vertica database. .. note:: Before you begin, `start the Vertica database `__ and manually create a table using the same name as the |ak| topic. Use the same schema as is used for the data in the |ak| topic or add ``auto.create=true``. -------------- Set up Vertica -------------- Use the following commands to manually set up Vertica. #. Pull the Vertica image from Docker Registry and run it with a persistent datastore. .. codewithvars:: bash docker pull dataplatform/docker-vertica docker run -p 5433:5433 -d -v /data/vertica/vertica_data:/home/dbadmin/docker dataplatform/docker-vertica #. Get the Docker image ID and launch a bash shell within the container. .. codewithvars:: bash docker ps docker exec -it bash #. Launch the Vertica console. .. codewithvars:: bash cd /opt/vertica/bin ./vsql -hlocalhost -Udbadmin #. Create the table and insert data. .. codewithvars:: bash create table mytable(f1 varchar(20)); --------------- Start Confluent --------------- Start the services using the Confluent CLI. .. codewithvars:: bash |confluent_start| Every service starts in order, printing a message with its status. .. include:: ../../../includes/cli.rst :start-after: CE_CLI_startup_output :end-before: COS_CP_CLI_startup_output To import a few records with a simple schema in |ak|, start the Avro console producer as follows: .. codewithvars:: bash ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic mytable \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' Then, in the console producer, enter the following: .. codewithvars:: bash {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} The three records entered are published to the |ak| topic ``mytable`` in Avro format. ---------------------- Property-based example ---------------------- Create a configuration file for the connector. This file is included with the connector in ``etc/kafka-connect-vertica/vertica-sink-connector.properties``. This configuration is used typically along with :ref:`standalone workers `. :: name=VerticaSinkConnector tasks.max=1 topics=mytable connector.class=io.confluent.vertica.VerticaSinkConnector vertica.database=docker vertica.host=127.0.0.1 vertica.port=5433 vertica.username=dbadmin vertica.password= auto.create=true confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 Start the Vertica connector by loading its configuration with the following command: .. include:: ../../../includes/confluent-local-connector-limit.rst .. codewithvars:: bash |confluent_load| VerticaSinkConnector|dash| -d vertica-sink-connector.properties { "name" : "VerticaSinkConnector", "config" : { "tasks.max":"1", "topics":"mytable", "connector.class":"io.confluent.vertica.VerticaSinkConnector", "vertica.database":"docker", "vertica.host":"127.0.0.1", "vertica.port":"5433", "vertica.username":"dbadmin", "vertica.password":"", "auto.create":"true", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor":"1" }, "tasks": [] } ------------------ REST-based example ------------------ Use this setting with :ref:`distributed workers `. Write the following JSON to ``config.json``, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the |kconnect| :ref:`REST API ` .. code-block:: json { "name" : "VerticaSinkConnector", "config" : { "tasks.max":"1", "topics":"mytable", "connector.class":"io.confluent.vertica.VerticaSinkConnector", "vertica.database":"docker", "vertica.host":"127.0.0.1", "vertica.port":"5433", "vertica.username":"dbadmin", "vertica.password":"", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor":"1" }, "tasks": [] } Use ``curl`` to post the configuration to one of the |kconnect| workers. Change ``http://localhost:8083/`` the endpoint of one of your |kconnect| workers. .. code-block:: bash curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors Use the following command to update the configuration of existing connector: .. code-block:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/VerticaSinkConnector/config Check that the connector started successfully. Review the |kconnect| worker's log with the following command: .. codewithvars:: bash |confluent_log| connect Toward the end of the log you should see that the connector starts, logs a few messages, and then adds data from |ak| to the Vertica table. Once the connector has ingested records, check whether the data is available in the Vertica table by running the following command in the Vertica console: .. codewithvars:: bash select * from mytable; f1 -------- value1 value2 value3 (3 rows) Finally, stop the |kconnect| worker and all other Confluent services. .. codewithvars:: bash confluent local stop Your output should resemble the following: .. include:: ../../../includes/cli.rst :start-after: ce_cli_stop_output_start :end-before: ce_cli_stop_output_stop You can stop all services and remove any data generated during this quick start by entering the following command: .. codewithvars:: bash |confluent_destroy| Your output should resemble the following: .. include:: ../../../includes/cli.rst :start-after: ce_cli_stop_destroy_output_start :end-before: ce_cli_stop_destroy_output_stop Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options changelog