Teradata Sink Connector for Confluent Platform¶
Kafka Connect Teradata Sink Connector を使用すると、Apache Kafka® のトピックから Teradata にデータをエクスポートできます。コネクターによって、Kafka からデータがポーリングされ、トピックのサブスクリプションに基づいてデータベースに書き込まれます。テーブルの自動作成と、制限付きの自動進化もサポートされています。
機能¶
Teradata Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
Teradata Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
データマッピング¶
Teradata Sink Connector はスキーマを認識していないため、適切なコンバーター(Schema Registry に付属の Avro コンバーターや、スキーマに対応した JSON コンバーターなど)を使用する必要があります。Kafka レコードキーがある場合は、プリミティブ型、または Connect 構造体にすることができます。レコード値は Connect 構造体にする必要があります。Connect 構造体から選択されるフィールドは、プリミティブ型にする必要があります。トピックのデータが互換性のあるフォーマットでない場合は、カスタムの Converter
の実装が必要になることがあります。
キー処理¶
デフォルトで、プライマリキーは pk.mode
が none
に設定された状態では抽出されません。送信先テーブルがコネクターによって自動生成される場合、これは適していません。Kafka レコードキー、Kafka レコード値、またはレコードの Kafka 座標からフィールドを使用できるようにする別のモードがあります。詳細については、プライマリキーの構成オプション を参照してください。
自動作成と自動進化¶
注釈
この機能を使用する Teradata ユーザーには CREATE TABLE
アクセス許可が必要です。
auto.create
を有効にすると、送信先テーブルが存在しない場合に、コネクターでテーブルを CREATE することができます。コネクターはテーブル定義のベースとしてレコードスキーマを使用するので、作成は、レコードをトピックから取り込みながらオンラインで実行されます。プライマリキーは、キーの 構成設定 に基づいて指定されます。
auto.evolve
を有効にすると、レコードに列が存在しない場合に、コネクターで送信先テーブルに対して ALTER を実行して、制限付きで自動進化を実行することができます。データ型の変更と列の削除は障害につながる可能性があるので、その種の進化をコネクターがテーブルに対して実行することはありません。プライマリキー制約の追加も試行されません。一方、auto.evolve
を無効にすると、進化は実行されず、コネクタータスクは、列が存在しないことを示すエラーで失敗します。
自動作成および自動進化のどちらについても、列の NULL 許容性は、スキーマ内の対応するフィールドのオプションによって決まります。また、デフォルト値も対応するフィールドのデフォルト値によって決まります(該当する場合)。Connect スキーマの型から Teradata の型への以下のマッピングを使用します。
スキーマの型 | Teradata |
---|---|
INT8 | BYTEINT |
INT16 | SMALLINT |
INT32 | INTEGER |
INT64 | BIGINT |
FLOAT32 | FLOAT |
FLOAT64 | DOUBLE PRECISION |
BOOLEAN | BYTEINT |
STRING | LONG VARCHAR CHARACTER SET UNICODE |
BYTES | VARBYTES(64000) |
'Decimal' | DECIMAL(38,s) |
'Date' | DATE |
'Time' | TIME WITH TIME ZONE |
'Timestamp' | TIMESTAMP WITH TIME ZONE |
重要
テーブルスキーマ進化の後方互換性を維持するため、レコードスキーマの新規フィールドは、省略可能にするか、デフォルト値を設定する必要があります。フィールドを削除する必要がある場合は、テーブルのスキーマを手動で変更して、対応する列をドロップするか、デフォルト値を割り当てるか、NULL 許容にする必要があります。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Teradata Sink Connector 構成プロパティ」を参照してください。
クイックスタート¶
コネクターの基本機能を確認するには、Avro データを単一のトピックからローカルの Teradata 開発環境にコピーします。
前提条件¶
- Confluent Platform 3.3.0 以上、または Kafka 0.11.0 以上
- Java 1.8
- Teradata 15.00 以上
注釈
Teradata コネクターは、夏時間ではないデフォルトタイムゾーンを使用して実行する必要があります。これは Teradata JDBC ドライバの機能制限であり、回避策はありません。Connect ワーカーは、システムプロパティ -Duser.timezone=UTC を設定した状態で実行することをお勧めします。
- Teradata 開発環境 をセットアップします。以降、このガイドの完了までデータベースは実行させたままにします。
- Teradata JDBC ドライバ をインストールします。
Teradata Sink Connector を読み込みます。¶
Teradata Sink Connector のプロパティファイルを作成します。
name=teradata-sink confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 connector.class=io.confluent.connect.teradata.TeradataSinkConnector tasks.max=1 topics=orders teradata.url=jdbc:teradata://localhost teradata.database=dev teradata.username=dev teradata.password=dev pk.mode=kafka auto.create=true key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
teradata-sink
コネクターを読み込みます。注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を入れる必要があります。詳細については、こちらの投稿 を参照してください。ちなみに
Confluent CLI 開発用コマンドのコマンド構文は、5.3.0 で変更されました。該当するコマンドは
confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。confluent local services connect connector load teradata-sink --config teradata-sink.properties
出力は以下のようになります。
{ "name": "teradata-sink", "config": { "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "connector.class": "io.confluent.connect.teradata.TeradataSinkConnector", "tasks.max": "1", "topics": "orders", "teradata.url": "jdbc:teradata://localhost", "teradata.database": "dev", "teradata.username": "dev", "teradata.password": "dev", "pk.mode": "kafka", "auto.create": "true", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "name": "teradata-sink" }, "tasks": [], "type": "sink" }
ちなみに
CLI 以外のユーザーは、以下のコマンドを使用して Teradata Sink Connector を読み込むことができます。
<path-to-confluent>/bin/connect-standalone \ <path-to-confluent>/etc/schema-registry/connect-avro-standalone.properties \ teradata-sink.properties
Teradata でのレコードの生成¶
orders
トピックにレコードを生成します。./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}'
コンソールプロデューサーが、入力を待機します。
以下のレコードをターミナルにコピーアンドペーストして、Enter キーを押します。
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
前に作成したデータベースユーザーにログインします。
bteq .logon dev dev
BTEQ コマンドプロンプトで、
orders
テーブルが自動的に作成され、レコードが含まれていることを確認できます。SELECT * FROM orders;
出力は以下のようになります。
SELECT * FROM orders; *** Query completed. 1 rows found. 7 columns returned. *** Total elapsed time was 1 second. __connect_topic product quantity __connect_partition __connect_offset price id -------------------------------------------------------------------------------------------------------- orders foo 100 0 0 5.00000000000000E 001 999
ちなみに
表示されない列がある場合は、
.width 300
や.set foldline
を設定してみてください。
トラブルシューティング¶
夏時間¶
このコネクターの実行中に、以下のエラーメッセージが表示されることがあります。
{
"error_code": 500,
"message": "This Connector must be used on a connect worker with a default timezone which does not observe daylight savings time. This is a functional limitation of the Teradata JDBC driver and has no workaround. On the JVM arguments, specify -Duser.timezone=UTC to override the system default.
...
}
このエラーを回避するには、-Duser.timezone=UTC を KAFKA_OPTS 環境変数に追加して、Connect ワーカーのデフォルトタイムゾーンを変更する必要があります。Kafka Connect ワーカーをコマンドラインから起動する場合は、KAFKA_OPTS 環境変数をエクスポートしてから Kafka Connect ワーカーを起動します。
- ::
- export KAFKA_OPTS="-Duser.timezone=UTC" connect-distributed -daemon /etc/kafka/connect-distributed.properties
Kafka Connect が systemd によって起動される場合は、Kafka Connect サービスファイルに以下を追加します。
- ::
- [Service] …Environment=KAFKA_OPTS="-Duser.timezone=UTC" …