Teradata Sink Connector for Confluent Platform¶
The Kafka Connect Teradata Sink connector allows you to export data from Apache Kafka® topics to Teradata. The connector polls data from Kafka to write to the database based on the topics subscription. Table auto-creation and limited auto-evolution are also supported.
Features¶
The Teradata Sink connector includes the following features:
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Data mapping
- Key handling
- Auto-creation and auto-evolution
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Teradata Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed
Data mapping¶
The Teradata Sink connector requires knowledge of schemas, so you should use a
suitable converter (for example, the Avro converter that comes with Schema Registry or the
JSON converter with schemas enabled). If present, Kafka record keys can be
primitive types or a Connect struct. The record value must be a Connect struct.
Fields being selected from Connect structs must be of primitive types. If the
data in the topic is not of a compatible format, implementing a custom
Converter
may be necessary.
Key handling¶
The default is for primary keys to not be extracted with pk.mode
set to
none
. This is not suitable when the connector is responsible for
auto-creating the destination table. There are different modes that can be
enabled to use fields from the Kafka record key, the Kafka record value, or the
Kafka coordinates for the record. Refer to primary key configuration
options for additional information.
Auto-creation and auto-evolution¶
Note
The Teradata user requires CREATE TABLE
permissions for this feature.
If auto.create
is enabled, the connector can CREATE the destination table if it is found to be missing.
The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.
Primary keys are specified based on the key configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing.
Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Addition of primary key constraints is also not attempted. In contrast, if auto.evolve
is disabled, no evolution is performed and the connector task fails with a missing columns error.
For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema. Default values are also specified based on the default value of the corresponding field (if applicable). Confluent uses the following mapping from Connect schema types to Teradata types:
Schema Type | Teradata |
---|---|
INT8 | BYTEINT |
INT16 | SMALLINT |
INT32 | INTEGER |
INT64 | BIGINT |
FLOAT32 | FLOAT |
FLOAT64 | DOUBLE PRECISION |
BOOLEAN | BYTEINT |
STRING | LONG VARCHAR CHARACTER SET UNICODE |
BYTES | VARBYTES (64000) |
‘Decimal’ | DECIMAL(38,s) |
‘Date’ | DATE |
‘Time’ | TIME WITH TIME ZONE |
‘Timestamp’ | TIMESTAMP WITH TIME ZONE |
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.
Limitations¶
The Teradata Sink connector does not support nested data types. Confluent uses the mapping shown in table in the Auto-creation and auto-evolution section.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license, and for information about the license topic, see License topic configuration.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Properties.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Installing the SFTP Sink Connector¶
You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
- Java 1.8
- Teradata 15.00 or later
- You must run the Teradata connector with a default timezone that does not
observe Daylight Saving Time. This is a functional limitation of the Teradata
JDBC driver and has no workaround. We recommend running your connect workers
with the system property
-Duser.timezone=UTC
set. - Set up your Teradata Development Environment. Leave the database running while you complete the remainder of this guide.
- If you are running a multi-node Connect cluster, the Teradata connector and JDBC driver JAR must be installed on every Connect worker in the cluster.Install the Teradata JDBC driver.
Installing the connector using Confluent Hub¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent-hub install confluentinc/kafka-connect-teradata:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent-hub install confluentinc/kafka-connect-teradata:1.0.5
Installing the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Installing JDBC Drivers¶
The Teradata Source and Sink connectors use the Java Database Connectivity (JDBC) API that enables applications to connect to and use a wide range of database systems. In order for this to work, the connectors must have a JDBC Driver for Teradata.
- Navigate to the Teradata Downloads page
- Log in with your Teradata account if you are not signed in already.
- Download the JDBC driver corresponding to your Teradata version.
- Extract the archive to get a
terajdbc4.jar
file. - Place this JAR file into the
share/confluent-hub-components/confluentinc-kafka-connect-teradata/lib
directory in your Confluent Platform installation on each of the Connect worker nodes. - Restart all of the Connect worker nodes.
The rest of this section outlines the specific steps for more common database management systems.
General guidelines¶
The following are additional guidelines to consider:
- The
share/confluent-hub-components/confluentinc-kafka-connect-teradata/lib
directory mentioned above is for Confluent Platform. If you are using a different installation, find the location where the Confluent Teradata source and sink connector JAR files are located, and place the JDBC driver JAR file(s) for the target databases into the same directory. - If the JDBC driver is not installed correctly, the Teradata source or sink connector will fail on startup.
Typically, the system throws the error
No suitable driver found
. If this happens, install the JDBC driver again by following the instructions. - The connector must be run on a connect worker with a default timezone which does not observe daylight savings. If daylight savings is observed in your timezone, then the connector will fail to configure or start. You can manually override the worker timezone by setting the TZ environment variable before starting the worker, or on the command line with the JVM argument -Duser.timezone=UTC
Quick Start¶
To see the basic functionality of the connector, you copy Avro data from a single topic to a local Teradata development environment.
Load the Teradata Sink Connector¶
Create a properties file for your Teradata Sink connector
name=teradata-sink confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 connector.class=io.confluent.connect.teradata.TeradataSinkConnector tasks.max=1 topics=orders teradata.url=jdbc:teradata://localhost teradata.database=dev teradata.username=dev teradata.password=dev pk.mode=kafka auto.create=true key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
Load the
teradata-sink
connector:Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local services start
. For more information, see confluent local.confluent local services connect connector load teradata-sink --config teradata-sink.properties
Your output should resemble:
{ "name": "teradata-sink", "config": { "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "connector.class": "io.confluent.connect.teradata.TeradataSinkConnector", "tasks.max": "1", "topics": "orders", "teradata.url": "jdbc:teradata://localhost", "teradata.database": "dev", "teradata.username": "dev", "teradata.password": "dev", "pk.mode": "kafka", "auto.create": "true", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "name": "teradata-sink" }, "tasks": [], "type": "sink" }
Tip
For non-CLI users, you can load the Teradata sink connector with the command below.
${CONFLUENT_HOME}/bin/connect-standalone \ ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-standalone.properties \ teradata-sink.properties
Produce a record in Teradata¶
Produce a record into the
orders
topic../bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}'
The console producer waits for input.
Copy and paste the following record into the terminal and press Enter:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Log into database user that you created earlier
bteq .logon dev dev
In the BTEQ command prompt you should see that the
orders
table was automatically created and contains the record:SELECT * FROM orders;
Your output should resemble:
SELECT * FROM orders; *** Query completed. 1 rows found. 7 columns returned. *** Total elapsed time was 1 second. __connect_topic product quantity __connect_partition __connect_offset price id -------------------------------------------------------------------------------------------------------- orders foo 100 0 0 5.00000000000000E 001 999
Tip
If you can’t see all the columns, try setting
.width 300
and/or.set foldline
.
Troubleshooting¶
Daylight saving time¶
When you run this connector, you might see the following error message.
{
"error_code": 500,
"message": "This Connector must be used on a connect worker with a default timezone which does not observe daylight savings time. This is a functional limitation of the Teradata JDBC driver and has no workaround. On the JVM arguments, specify -Duser.timezone=UTC to override the system default.
...
}
In order to avoid this error, you must change the default timezone of the connect worker by adding -Duser.timezone=UTC to KAFKA_OPTS environment variable. If you start Kafka Connect worker from the command line, you can export the KAFKA_OPTS environment variable before starting Kafka Connect worker.
- ::
- export KAFKA_OPTS=”-Duser.timezone=UTC” connect-distributed -daemon /etc/kafka/connect-distributed.properties
If Kafka Connect is started by systemd, add this to your Kafka Connect service file:
- ::
- [Service] … Environment=KAFKA_OPTS=”-Duser.timezone=UTC” …