Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Teradata Sink Connector for Confluent Platform¶
The Kafka Connect Teradata sink connector allows you to export data from Apache Kafka® topics to Teradata. The connector polls data from Kafka to write to the database based on the topics subscription. Table auto-creation and limited auto-evolution are also supported.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Teradata Sink Connector Configuration Properties.
Features¶
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable converter (for example, the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled).
If present, Kafka record keys can be primitive types or a Connect struct. The record value must be a Connect struct.
Fields being selected from Connect structs must be of primitive types.
If the data in the topic is not of a compatible format, implementing a custom Converter
may be necessary.
Key handling¶
The default is for primary keys to not be extracted with pk.mode
set to
none
. This is not suitable when the connector is responsible for
auto-creating the destination table. There are different modes that can be
enabled to use fields from the Kafka record key, the Kafka record value, or the
Kafka coordinates for the record. Refer to primary key configuration options for additional information.
Auto-creation and Auto-evolution¶
Note
The Teradata user requires CREATE TABLE
permissions for this feature.
If auto.create
is enabled, the connector can CREATE the destination table if it is found to be missing.
The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.
Primary keys are specified based on the key configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing.
Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Addition of primary key constraints is also not attempted. In contrast, if auto.evolve
is disabled, no evolution is performed and the connector task fails with a missing columns error.
For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema. Default values are also specified based on the default value of the corresponding field (if applicable). We use the following mapping from Connect schema types to Teradata types:
Schema Type | Teradata |
---|---|
INT8 | BYTEINT |
INT16 | SMALLINT |
INT32 | INTEGER |
INT64 | BIGINT |
FLOAT32 | FLOAT |
FLOAT64 | DOUBLE PRECISION |
BOOLEAN | BYTEINT |
STRING | LONG VARCHAR CHARACTER SET UNICODE |
BYTES | VARBYTES(64000) |
‘Decimal’ | DECIMAL(38,s) |
‘Date’ | DATE |
‘Time’ | TIME WITH TIME ZONE |
‘Timestamp’ | TIMESTAMP WITH TIME ZONE |
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.
Quick Start¶
To see the basic functionality of the connector, you copy Avro data from a single topic to a local Teradata development environment.
Prerequisites¶
- Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Java 1.8
- Teradata 15.00 or above
Note
You must run the Teradata connector with a default timezone that does not observe Daylight Saving Time. This is a functional limitation of the Teradata JDBC driver and has no workaround. We recommend running your connect workers with the system propery -Duser.timezone=UTC set.
- Set up your Teradata Development Environment. Leave the database running while you complete the remainder of this guide.
- Install the Teradata JDBC driver.
Load the Teradata Sink Connector¶
Create a properties file for your Teradata Sink Connector
name=teradata-sink confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 connector.class=io.confluent.connect.teradata.TeradataSinkConnector tasks.max=1 topics=orders teradata.url=jdbc:teradata://localhost teradata.database=dev teradata.username=dev teradata.password=dev pk.mode=kafka auto.create=true key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
Load the
teradata-sink
connector:Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local start
. For more information, see confluent local.confluent local load teradata-sink -- -d teradata-sink.properties
Your output should resemble:
{ "name": "teradata-sink", "config": { "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "connector.class": "io.confluent.connect.teradata.TeradataSinkConnector", "tasks.max": "1", "topics": "orders", "teradata.url": "jdbc:teradata://localhost", "teradata.database": "dev", "teradata.username": "dev", "teradata.password": "dev", "pk.mode": "kafka", "auto.create": "true", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "name": "teradata-sink" }, "tasks": [], "type": "sink" }
Tip
For non-CLI users, you can load the Teradata sink connector with the command below.
<path-to-confluent>/bin/connect-standalone \ <path-to-confluent>/etc/schema-registry/connect-avro-standalone.properties \ teradata-sink.properties
Produce a Record in Teradata¶
Produce a record into the
orders
topic../bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}'
The console producer waits for input.
Copy and paste the following record into the terminal and press Enter:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Log into database user that you created earlier
bteq .logon dev dev
In the BTEQ command prompt you should see that the
orders
table was automatically created and contains the record:SELECT * FROM orders;
Your output should resemble:
SELECT * FROM orders; *** Query completed. 1 rows found. 7 columns returned. *** Total elapsed time was 1 second. __connect_topic product quantity __connect_partition __connect_offset price id -------------------------------------------------------------------------------------------------------- orders foo 100 0 0 5.00000000000000E 001 999
Tip
If you can’t see all the columns, try setting
.width 300
and/or.set foldline
.
Troubleshooting¶
Daylight Savings Time¶
When you run this connector, you might see the following error message.
{
"error_code": 500,
"message": "This Connector must be used on a connect worker with a default timezone which does not observe daylight savings time. This is a functional limitation of the Teradata JDBC driver and has no workaround. On the JVM arguments, specify -Duser.timezone=UTC to override the system default.
...
}
In order to avoid this error, you must change the default timezone of the connect worker by adding -Duser.timezone=UTC to KAFKA_OPTS environment variable. If you start Kafka Connect worker from the command line, you can export the KAFKA_OPTS environment variable before starting Kafka Connect worker.
- ::
- export KAFKA_OPTS=”-Duser.timezone=UTC” connect-distributed -daemon /etc/kafka/connect-distributed.properties
If Kafka Connect is started by systemd, add this to your Kafka Connect service file:
- ::
- [Service] … Environment=KAFKA_OPTS=”-Duser.timezone=UTC” …