Teradata Sink Connector for Confluent Platform

The Kafka Connect Teradata Sink connector allows you to export data from Apache Kafka® topics to Teradata. The connector polls data from Kafka to write to the database based on the topics subscription. Table auto-creation and limited auto-evolution are also supported.

Features

The Teradata Sink connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Teradata Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed

Data mapping

The Teradata Sink connector requires knowledge of schemas, so you should use a suitable converter (for example, the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled). If present, Kafka record keys can be primitive types or a Connect struct. The record value must be a Connect struct. Fields being selected from Connect structs must be of primitive types. If the data in the topic is not of a compatible format, implementing a custom Converter may be necessary.

Key handling

The default is for primary keys to not be extracted with pk.mode set to none. This is not suitable when the connector is responsible for auto-creating the destination table. There are different modes that can be enabled to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record. Refer to primary key configuration options for additional information.

Auto-creation and auto-evolution

Note

The Teradata user requires CREATE TABLE permissions for this feature.

If auto.create is enabled, the connector can CREATE the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. Primary keys are specified based on the key configuration settings.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Addition of primary key constraints is also not attempted. In contrast, if auto.evolve is disabled, no evolution is performed and the connector task fails with a missing columns error.

For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema. Default values are also specified based on the default value of the corresponding field (if applicable). Confluent uses the following mapping from Connect schema types to Teradata types:

Schema Type Teradata
INT8 BYTEINT
INT16 SMALLINT
INT32 INTEGER
INT64 BIGINT
FLOAT32 FLOAT
FLOAT64 DOUBLE PRECISION
BOOLEAN BYTEINT
STRING LONG VARCHAR CHARACTER SET UNICODE
BYTES VARBYTES (64000)
‘Decimal’ DECIMAL(38,s)
‘Date’ DATE
‘Time’ TIME WITH TIME ZONE
‘Timestamp’ TIMESTAMP WITH TIME ZONE

Important

For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.

Limitations

  • The Teradata Sink connector does not support nested data types. Confluent uses the mapping shown in table in the Auto-creation and auto-evolution section.
  • The Teradata Sink connector does not support upserts.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license, and for information about the license topic, see License topic configuration.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Teradata Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Installing the SFTP Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
  • Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
  • Java 1.8
  • Teradata 15.00 or later
  • You must run the Teradata connector with a default timezone that does not observe Daylight Saving Time. This is a functional limitation of the Teradata JDBC driver and has no workaround. We recommend running your connect workers with the system property -Duser.timezone=UTC set.
  • Set up your Teradata Development Environment. Leave the database running while you complete the remainder of this guide.
  • If you are running a multi-node Connect cluster, the Teradata connector and JDBC driver JAR must be installed on every Connect worker in the cluster.Install the Teradata JDBC driver.

Installing the connector using Confluent Hub

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent-hub install confluentinc/kafka-connect-teradata:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent-hub install confluentinc/kafka-connect-teradata:1.1.0

Installing the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Installing JDBC Drivers

The Teradata Source and Sink connectors use the Java Database Connectivity (JDBC) API that enables applications to connect to and use a wide range of database systems. In order for this to work, the connectors must have a JDBC Driver for Teradata.

  1. Navigate to the Teradata Downloads page
  2. Log in with your Teradata account if you are not signed in already.
  3. Download the JDBC driver corresponding to your Teradata version.
  4. Extract the archive to get a terajdbc4.jar file.
  5. Place this JAR file into the share/confluent-hub-components/confluentinc-kafka-connect-teradata/lib directory in your Confluent Platform installation on each of the Connect worker nodes.
  6. Restart all of the Connect worker nodes.

The rest of this section outlines the specific steps for more common database management systems.

General guidelines

The following are additional guidelines to consider:

  • The share/confluent-hub-components/confluentinc-kafka-connect-teradata/lib directory mentioned above is for Confluent Platform. If you are using a different installation, find the location where the Confluent Teradata source and sink connector JAR files are located, and place the JDBC driver JAR file(s) for the target databases into the same directory.
  • If the JDBC driver is not installed correctly, the Teradata source or sink connector will fail on startup. Typically, the system throws the error No suitable driver found. If this happens, install the JDBC driver again by following the instructions.
  • The connector must be run on a connect worker with a default timezone which does not observe daylight savings. If daylight savings is observed in your timezone, then the connector will fail to configure or start. You can manually override the worker timezone by setting the TZ environment variable before starting the worker, or on the command line with the JVM argument -Duser.timezone=UTC

Quick Start

To see the basic functionality of the connector, you copy Avro data from a single topic to a local Teradata development environment.

Load the Teradata Sink Connector

  1. Create a properties file for your Teradata Sink connector

    name=teradata-sink
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    connector.class=io.confluent.connect.teradata.TeradataSinkConnector
    tasks.max=1
    topics=orders
    teradata.url=jdbc:teradata://localhost
    teradata.database=dev
    teradata.username=dev
    teradata.password=dev
    pk.mode=kafka
    auto.create=true
    
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
  2. Load the teradata-sink connector:

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services connect connector load teradata-sink --config teradata-sink.properties
    

    Your output should resemble:

    {
       "name": "teradata-sink",
       "config": {
         "confluent.topic.bootstrap.servers": "localhost:9092",
         "confluent.topic.replication.factor": "1",
         "connector.class": "io.confluent.connect.teradata.TeradataSinkConnector",
         "tasks.max": "1",
         "topics": "orders",
         "teradata.url": "jdbc:teradata://localhost",
         "teradata.database": "dev",
         "teradata.username": "dev",
         "teradata.password": "dev",
         "pk.mode": "kafka",
         "auto.create": "true",
         "key.converter": "io.confluent.connect.avro.AvroConverter",
         "key.converter.schema.registry.url": "http://localhost:8081",
         "value.converter": "io.confluent.connect.avro.AvroConverter",
         "value.converter.schema.registry.url": "http://localhost:8081",
         "name": "teradata-sink"
       },
       "tasks": [],
       "type": "sink"
     }
    

    Tip

    For non-CLI users, you can load the Teradata sink connector with the command below.

    ${CONFLUENT_HOME}/bin/connect-standalone \
    ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-standalone.properties \
    teradata-sink.properties
    

Produce a record in Teradata

  1. Produce a record into the orders topic.

    ./bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic orders \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
    "type": "float"}]}'
    

    The console producer waits for input.

  2. Copy and paste the following record into the terminal and press Enter:

    {"id": 999, "product": "foo", "quantity": 100, "price": 50}
    
  3. Log into database user that you created earlier

    bteq
    
    .logon dev
    
    dev
    
  4. In the BTEQ command prompt you should see that the orders table was automatically created and contains the record:

    SELECT * FROM orders;
    

    Your output should resemble:

    SELECT * FROM orders;
    
    *** Query completed. 1 rows found. 7 columns returned.
    *** Total elapsed time was 1 second.
    
    __connect_topic  product   quantity  __connect_partition   __connect_offset  price                   id
    --------------------------------------------------------------------------------------------------------
    orders           foo       100       0                     0                 5.00000000000000E 001   999
    

    Tip

    If you can’t see all the columns, try setting .width 300 and/or .set foldline.

Troubleshooting

Daylight saving time

When you run this connector, you might see the following error message.

{
  "error_code": 500,
  "message": "This Connector must be used on a connect worker with a default timezone which does not observe daylight savings time. This is a functional limitation of the Teradata JDBC driver and has no workaround. On the JVM arguments, specify -Duser.timezone=UTC to override the system default.
  ...
 }

In order to avoid this error, you must change the default timezone of the connect worker by adding -Duser.timezone=UTC to KAFKA_OPTS environment variable. If you start Kafka Connect worker from the command line, you can export the KAFKA_OPTS environment variable before starting Kafka Connect worker.

::
export KAFKA_OPTS=”-Duser.timezone=UTC” connect-distributed -daemon /etc/kafka/connect-distributed.properties

If Kafka Connect is started by systemd, add this to your Kafka Connect service file:

::
[Service] … Environment=KAFKA_OPTS=”-Duser.timezone=UTC” …