Teradata Source Connector for Confluent Platform¶
The Kafka Connect Teradata Source connector allows you to import data from Teradata into Apache Kafka® topics.
Data is loaded by periodically executing a SQL query that creates an output record for each row in the result set. By default, all tables in a database are copied, each to its own output topic. The database is monitored for new or deleted tables and adapts automatically. When copying data from a table, the connector can load only new or modified rows by specifying which columns should be used to detect new or modified data.
You can configure Java streams applications to deserialize and ingest data in multiple ways, including Kafka console producers, JDBC source connectors, and Java client producers. For full code examples, see Pipelining with Kafka Connect and Kafka Streams.
Features¶
The Teradata Source connector includes the following features:
- At least once delivery
- Supports one task
- Table copying
- Incremental query modes
- Message Keys
- Mapping Column Types
- Numeric mapping property
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Supports one task¶
The Teradata Source connector can read one or more tables from a single task. In query mode, the connector supports running only one task.
Table copying¶
The source connector supports copying tables with a variety of JDBC data types, adding and removing tables from the database dynamically, whitelists and blacklists, varying polling intervals, and other settings. However, the most important features for most users are the settings controlling how data is incrementally copied from the database.
Kafka Connect tracks the latest record it retrieved from each table, so it can start in the correct location on the next iteration (or in case of a crash). The source connector uses this functionality to only get updated rows from a table (or from the output of a custom query) on each iteration. Several modes are supported, each of which differs in how modified rows are detected.
Incremental query modes¶
Each incremental query mode tracks a set of columns for each row, which it uses
to keep track of which rows have been processed and which rows are new or have
been updated. The mode
setting controls this behavior and supports the
following options:
- Incrementing Column: A single column containing a unique ID for each row, where newer rows are
guaranteed to have larger IDs–that is, an
AUTOINCREMENT
column. Note that this mode can only detect new rows. Updates to existing rows cannot be detected, so this mode should only be used for immutable data. One example where you might use this mode is when streaming fact tables in a data warehouse, since those are typically insert-only. - Timestamp Column: In this mode, a single column containing a modification timestamp is used to track the last time data was processed and to query only for rows that have been modified since that time. Note that because timestamps are not necessarily unique, this mode cannot guarantee all updated data will be delivered. For example, if two rows share the same timestamp and are returned by an incremental query, but only one has been processed before a crash, the second update will be missed when the system recovers.
- Timestamp and Incrementing Columns: This is the most robust and accurate mode, combining an incrementing column with a timestamp column. By combining the two, as long as the timestamp is sufficiently granular, each (id, timestamp) tuple will uniquely identify an update to a row. Even if an update fails after partially completing, unprocessed updates will still be correctly detected and delivered when the system recovers.
- Custom Query: The source connector supports using custom queries instead of copying whole
tables. With a custom query, one of the other update automatic update modes can be used as long
as the necessary
WHERE
clause can be correctly appended to the query. Alternatively, the specified query may handle filtering to new updates itself; however, note that no offset tracking will be performed (unlike the automatic modes whereincrementing
and/ortimestamp
column values are recorded for each record), so the query must track offsets itself. - Bulk: This mode is unfiltered and therefore not incremental at all. It will load all rows from a table on each iteration. This can be useful if you want to periodically dump an entire table where entries are eventually deleted and the downstream system can safely handle duplicates.
Note that all incremental query modes that use certain columns to detect changes will require indexes on those columns to efficiently perform the queries.
For incremental query modes that use timestamps, the source connector uses a configuration
timestamp.delay.interval.ms
to control the waiting period after a row with certain timestamp appears
before you include it in the result. The additional wait allows transactions with earlier timestamps
to complete and the related changes to be included in the result. For more information, see Configuration Properties.
Message Keys¶
Kafka messages are key/value pairs. For a Teradata connector, the value (payload) is the contents of the table row being ingested. However, the Teradata connector does not generate the key by default.
Message keys are useful in setting up partitioning strategies. Keys can direct messages to a specific partition and can support downstream processing where joins are used. If no message key is used, messages are sent to partitions using round-robin distribution.
To set a message key for the Teradata connector, you use two Single Message
Transformations (SMTs): the ValueToKey SMT and the
ExtractField SMT. You add these two SMTs to the Teradata
connector configuration. For example, the following shows a snippet added to a
configuration that takes the id
column of the accounts
table
to use as the message key.
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "teradata_source_01",
"config": {
"connector.class": "io.confluent.connect.teradata.TeradataSourceConnector",
"teradata.url": "jdbc:teradata://localhost",
"teradata.database": "dev",
"teradata.username": "dev",
"teradata.password": "dev",
"topic.prefix": "teradata-01-",
"poll.interval.ms" : 3600000,
"table.whitelist" : "dev.accounts",
"mode":"bulk",
"transforms":"createKey,extractInt",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"id"
}
}'
Mapping Column Types¶
The source connector has a few options for controlling how column types are mapped into Kafka Connect field types. By default, the connector maps SQL/JDBC types to the most accurate representation in Java, which is straightforward for many SQL types but may be a bit unexpected for some types, as described in the following section.
Numeric mapping property¶
SQL’s NUMERIC
and DECIMAL
types have exact semantics controlled by
precision and scale. The most accurate representation for these types is
Connect’s Decimal
logical type which uses Java’s BigDecimal
representation. Avro serializes Decimal
types as bytes that may be difficult
to consume and that may require additional conversion to an appropriate data
type. The source connector’s numeric.mapping
configuration property does this by casting numeric values to the most
appropriate primitive type using the numeric.mapping=best_fit
value. The
following values are available for the numeric.mapping
configuration property:
- none: Use this value if all
NUMERIC
columns are to be represented by the Kafka ConnectDecimal
logical type. This is the default value for this property.Decimal
types are mapped to their binary representation. - best_fit: Use this value if all
NUMERIC
columns should be cast to Connect INT8, INT16, INT32, INT64, or FLOAT64 based upon the column’s precision and scale. This is the property value you should likely use if you haveNUMERIC
/NUMBER
source data. It attempts to mapNUMERIC
columns to the ConnectINT8
,INT16
,INT32
,INT64
, andFLOAT64
primitive type, based upon the column’s precision and scale values, as shown below:
Precision | Scale | Connect primitive type |
---|---|---|
1 to 2 | -84 to 0 | INT8 |
3 to 4 | -84 to 0 | INT16 |
5 to 9 | -84 to 0 | INT32 |
10 to 18 | -84 to 0 | INT64 |
1 to 18 | positive | FLOAT64 |
- precision_only: Use this to map
NUMERIC
columns based only on the column’s precision (assuming that column’s scale is 0). This option attempts to mapNUMERIC
columns to ConnectINT8
,INT16
,INT32
, andINT64
types based only upon the column’s precision, and where the scale is always 0.
Precision | Scale | Connect primitive type |
---|---|---|
1 to 2 | 0 | INT8 |
3 to 4 | 0 | INT16 |
5 to 9 | 0 | INT32 |
10 to 18 | 0 | INT64 |
Tip
For a deeper dive into this topic, see the Confluent blog article Bytes, Decimals, Numerics and oh my.
Note
The numeric.precision.mapping
property is older and is now deprecated.
When enabled, it is equivalent to numeric.mapping=precision_only
. When
not enabled, it is equivalent to numeric.mapping=none
.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license, and for information about the license topic, see License topic configuration.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Properties.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Installing the Teradata Source Connector¶
You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
- Java 1.8
- Teradata 15.00 or later
- You must run the Teradata connector with a default timezone that does not
observe Daylight Saving Time. This is a functional limitation of the Teradata
JDBC driver and has no workaround. We recommend running your connect workers
with the system property
-Duser.timezone=UTC
set. - The Teradata user must have
RETRIEVE
/SELECT
permissions. - Set up your Teradata Development Environment. Leave the database running while you complete the remainder of this guide.
- If you are running a multi-node Connect cluster, the Teradata connector and JDBC driver JAR must be installed on every Connect worker in the cluster. Install the Teradata JDBC driver.
Installing the connector using Confluent Hub¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent-hub install confluentinc/kafka-connect-teradata:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent-hub install confluentinc/kafka-connect-teradata:1.0.5
Installing the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Installing JDBC Drivers¶
The Teradata Source and Sink connectors use the Java Database Connectivity (JDBC) API that enables applications to connect to and use a wide range of database systems. In order for this to work, the connectors must have a JDBC Driver for Teradata.
- Navigate to the Teradata Downloads page
- Log in with your Teradata account if you are not signed in already.
- Download the JDBC driver corresponding to your Teradata version.
- Extract the archive to get a
terajdbc4.jar
file. - Place this JAR file into the
share/confluent-hub-components/confluentinc-kafka-connect-teradata/lib
directory in your Confluent Platform installation on each of the Connect worker nodes. - Restart all of the Connect worker nodes.
The rest of this section outlines the specific steps for more common database management systems.
General guidelines¶
The following are additional guidelines to consider:
- The
share/confluent-hub-components/confluentinc-kafka-connect-teradata/lib
directory mentioned above is for Confluent Platform. If you are using a different installation, find the location where the Confluent Teradata source and sink connector JAR files are located, and place the JDBC driver JAR file(s) for the target databases into the same directory. - If the JDBC driver is not installed correctly, the Teradata source or sink connector will fail on startup.
Typically, the system throws the error
No suitable driver found
. If this happens, install the JDBC driver again by following the instructions. - The connector must be run on a connect worker with a default timezone which does not observe daylight savings. If daylight savings is observed in your timezone, then the connector will fail to configure or start. You can manually override the worker timezone by setting the TZ environment variable before starting the worker, or on the command line with the JVM argument -Duser.timezone=UTC
Quick Start¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
To see the basic functionality of the connector, you copy a single table from a local Teradata Development Environment database. In this quick start, you can assume each entry in the table is assigned a unique ID and is not modified after creation.
Create Table and Load Data¶
Open Basic Teradata Query (BTEQ) and log in with the development user:
bteq .logon dev dev
In BTEQ, create a table and seed it with some data:
CREATE TABLE accounts ( id INTEGER GENERATED ALWAYS AS IDENTITY NOT NULL, name LONG VARCHAR CHARACTER SET UNICODE ) UNIQUE PRIMARY INDEX (id) ;
INSERT INTO accounts (name) VALUES ('alice');
INSERT INTO accounts (name) VALUES ('bob');
Tip
You can run
SELECT * from accounts;
to verify your table has been created.
Load the Teradata Source Connector¶
Create a properties file for your Teradata Source connector.
name=teradata-source confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 connector.class=io.confluent.connect.teradata.TeradataSourceConnector tasks.max=1 teradata.url=jdbc:teradata://localhost teradata.database=dev teradata.username=dev teradata.password=dev mode=incrementing incrementing.column.name=id topic.prefix=test-teradata- table.whitelist=dev.accounts key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
Load the
teradata-source
connector:Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local services start
. For more information, see confluent local.confluent local services connect connector load teradata-source --config teradata-source.properties
Your output should resemble:
{ "name": "teradata-source", "config": { "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "connector.class": "io.confluent.connect.teradata.TeradataSourceConnector", "tasks.max": "1", "teradata.url": "jdbc:teradata://localhost", "teradata.database": "dev", "teradata.username": "dev", "teradata.password": "dev", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "test-teradata-", "table.whitelist": "dev.accounts", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "name": "teradata-source" }, "tasks": [], "type": "source" }
Tip
For non-CLI users, you can load the Teradata sink connector with the command below.
${CONFLUENT_HOME}/bin/connect-standalone \ ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-standalone.properties \ teradata-source.properties
To check that it has copied the data that was present when you started Kafka Connect, start a console consumer, reading from the beginning of the topic:
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test-teradata-accounts --from-beginning {"id":1,"name":{"string":"alice"}} {"id":2,"name":{"string":"bob"}}
The output shows the two records as expected, one per line, in the JSON encoding
of the Avro records. Each row is represented as an Avro record and each column
is a field in the record. You can see both columns in the table, id
and
name
.
The IDs were auto-generated and the column is of type INTEGER NOT
NULL
, which can be encoded directly as an integer. The name
column has
type STRING
and can be NULL
. The JSON encoding of Avro encodes the
strings in the format {"type": value}
, so you can see that both rows have
string
values with the names specified when you inserted the data.
Add a Record to the Consumer¶
Add another record using BTEQ:
INSERT INTO accounts (name) VALUES ('cathy');
You can switch back to the console consumer and see the new record is added and, importantly, the old entries are not repeated:
{"id":3,"name":{"string":"cathy"}}
Note that the default polling interval is five seconds, so it may take a few seconds to show up. Depending on your expected rate of updates or desired latency, a smaller poll interval could be used to deliver updates more quickly.
All Kafka Connect features, including offset
management and fault tolerance, work with the source connector. You can restart
and kill the processes and they will pick up where they left off, copying only
new data (as defined by the mode
setting).
Configuration¶
The source connector gives you quite a bit of flexibility in the databases you can import data from and how that data is imported. This section first describes how to access databases whose drivers are not included with Confluent Platform, then gives a few example configuration files that cover common scenarios, then provides an exhaustive description of the available configuration options.
The full set of configuration options are listed in Configuration Properties, but here are a few template configurations that cover some common usage scenarios.
Use a whitelist to limit changes to a subset of tables in a Teradata database, using id
and
modified
columns that are standard on all whitelisted tables to detect rows that have been
modified. This mode is the most robust because it can combine the unique, immutable row IDs with
modification timestamps to guarantee modifications are not missed even if the process dies in the
middle of an incremental update query.
name=teradata-whitelist-timestamp-source
connector.class=io.confluent.connect.teradata.TeradataSourceConnector
tasks.max=10
teradata.url=jdbc:teradata://localhost
teradata.database=dev
teradata.username=dev
teradata.password=dev
table.whitelist=dev.users,dev.products,dev.transactions
mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id
topic.prefix=teradata-
Use a custom query instead of loading tables, allowing you to join data from multiple tables. As long as the query does not include its own filtering, you can still use the built-in modes for incremental queries (in this case, using a timestamp column). Note that this limits you to a single output per connector and because there is no table name, the topic “prefix” is actually the full topic name in this case.
name=teradata-whitelist-timestamp-source
connector.class=io.confluent.connect.teradata.TeradataSourceConnector
tasks.max=10
teradata.url=jdbc:teradata://localhost
teradata.database=dev
teradata.username=dev
teradata.password=dev
query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=teradata-joined-data
Schema Evolution¶
The Teradata connector supports schema evolution when the Avro converter is used. When there is a change in a database table schema, the Teradata connector can detect the change, create a new Connect schema and try to register a new Avro schema in Schema Registry. Whether you can successfully register the schema or not depends on the compatibility level of Schema Registry, which is backward by default.
For example, if you remove a column from a table, the change is backward compatible and the corresponding Avro schema can be successfully registered in Schema Registry. If you modify the database table schema to change a column type or add a column, when the Avro schema is registered to Schema Registry, it will be rejected as the changes are not backward compatible.
You can change the compatibility level of Schema Registry to allow incompatible schemas or other compatibility levels. There are two ways to do this:
- Set the compatibility level for subjects which are used by the connector using
PUT /config/(string: subject)
. The subjects have format oftopic-key
andtopic-value
where thetopic
is determined bytopic.prefix
config and table name. - Configure Schema Registry to use other schema compatibility level by setting
avro.compatibility.level
in Schema Registry. Note that this is a global setting that applies to all schemas in Schema Registry.
However, due to the limitation of the JDBC API, some compatible schema changes may be treated as incompatible change. For example, adding a column with default value is a backward compatible change. However, limitations of the JDBC API make it difficult to map this to default values of the correct type in a Kafka Connect schema, so the default values are currently omitted. The implications is that even some changes of the database table schema is backward compatible, the schema registered in Schema Registry is not backward compatible as it doesn’t contain a default value.
If the Teradata connector is used together with the HDFS connector, there are some restrictions to schema compatibility as well. When Hive integration is enabled, schema compatibility is required to be backward, forward and full to ensure that the Hive schema is able to query the whole data under a topic. As some compatible schema change will be treated as incompatible schema change, those changes will not work as the resulting Hive schema will not be able to query the whole data for a topic.
Troubleshooting¶
Daylight Savings Time¶
When you run this connector, you might see the following error message.
{
"error_code": 500,
"message": "This Connector must be used on a connect worker with a default timezone which does not observe daylight savings time. On the JVM arguments, specify -Duser.timezone=UTC to override the system default.
...
}
Open kafka-run-class
under CONFLUENT_HOME/bin
and look for the # Generic jvm settings you want to add
comment. Add the following lines, and restart Confluent Platform.
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS="-Duser.timezone=UTC"
fi