JDBC Connector (Source および Sink) for Confluent Platform¶
Kafka Connect JDBC Source Connector を使用すると、データを JDBC ドライバが提供されている各種リレーショナルデータベースから、Apache Kafka® トピックにインポートできます。JDBC Sink Connector を使用すると、データを Kafka トピックから、JDBC ドライバが提供されている各種リレーショナルデータベースにエクスポートできます。JDBC コネクターでは、さまざまなデータベースがサポートされており、各データベースのカスタムコードも必要ありません。
JDBC コネクターのインストール¶
このコネクターは、こちらの手順 でインストールできます。ZIP ファイルを手動でダウンロードすることもできます。
Confluent Hub を使用したコネクターのインストール¶
- 前提条件
- Confluent Hub クライアント をインストールしておく必要があります。これは Confluent Enterprise と一緒にデフォルトでインストールされます。
Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行して、最新( latest
)バージョンのコネクターをインストールします。コネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
confluent-hub install confluentinc/kafka-connect-jdbc:latest
特定のバージョンをインストールするには、latest
をバージョン番号に置き換えます。以下に例を示します。
confluent-hub install confluentinc/kafka-connect-jdbc:10.0.0
注釈
複数ノードの Connect クラスターを実行している場合、JDBC コネクターおよび JDBC ドライバ JAR をクラスター内の各 Connect ワーカーにインストールする必要があります。以下に詳細を示します。
コネクターの手動インストール¶
コネクターの ZIP ファイルをダウンロードして展開 します。コネクターの手動インストールの 手順 に従ってください。
ライセンス¶
Confluent コミュニティライセンス を取得することで、このコネクターを利用できます。
構成プロパティ¶
ソースコネクターの構成プロパティの網羅的なリストについては、「 JDBC Connector Source Connector 構成プロパティ」を参照してください。
シンクコネクターの構成プロパティの網羅的なリストについては、「 JDBC Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター 」を参照してください。
JDBC ドライバのインストール¶
JDBC Source Connector と JDBC Sink Connector で Java Database Connectivity(JDBC)API を採用することで、アプリケーションがさまざまなデータベースシステムに接続して使用できるようになります。これを行うためには、使用する特定のデータベースシステム用の JDBC ドライバをコネクターに用意する必要があります。
コネクターには、いくつかのデータベースシステム用の JDBC ドライバが用意されていますが、その他のデータベースシステムでコネクターを使用するには、そのデータベースシステム用の最新の JDBC 4.0 ドライバをインストールしておく必要があります。詳細は JDBC ドライバによって異なりますが、基本的な手順は以下のとおりです。
- 使用する各データベースシステム用の JDBC 4.0 ドライバの JAR ファイルを見つけます。
- これらの JAR ファイルを、各 Connect ワーカーノードで Confluent Platform のインストール環境の
share/java/kafka-connect-jdbc
ディレクトリに配置します。 - Connect ワーカーノードをすべて再起動します。
このセクションの残りの部分では、その他の一般的なデータベース管理システムについて具体的な手順を概説します。
一般的なガイドライン¶
考慮する必要のある追加のガイドラインを以下に示します。
- 使用可能な最新バージョンの JDBC 4.0 ドライバを使用します。最新バージョンの JDBC ドライバでは、データベース管理システムのほとんどのバージョンがサポートされ、多くのバグが修正されています。
- Connect ワーカーの実行に使用する Java バージョンに合った JAR ファイルを使用します。一部の JDBC ドライバには、複数の Java バージョンで動作する単一の JAR があります。その他のドライバには、Java 8 用の JAR とは別に、Java 10 用または Java 11 用の JAR があります。使用する Java バージョン用の正しい JAR ファイルを使用してください。JDBC ドライバの JAR ファイルをインストールして使用するとき、現在の Java バージョンがこの JAR ファイルでサポートされていないと、JDBC Source Connector または JDBC Sink Connector の起動が
UnsupportedClassVersionError
によって失敗する可能性があります。この場合は、インストールした JDBC ドライバの JAR ファイルを削除し、正しい JAR ファイルを使用してドライバインストールプロセスを繰り返します。 - 上記の
share/java/kafka-connect-jdbc
ディレクトリは Confluent Platform 用です。別のインストール環境を使用する場合は、Confluent JDBC Source Connector と Confluent JDBC Sink Connector の JAR ファイルがある場所を見つけ、その同じディレクトリにターゲットデータベース用の JDBC ドライバの JAR ファイルを配置してください。 - データベース管理システムに固有の JDBC ドライバが正しくインストールされていない場合、JDBC Source Connector または JDBC Sink Connector は起動に失敗します。通常、システムはエラー
No suitable driver found
をスローします。これが発生した場合は、もう一度手順に従って JDBC ドライバをインストールしてください。
Microsoft SQL Server¶
JDBC Source Connector と JDBC Sink Connector には、Microsoft SQL Server に対して読み書きするために、オープンソースの jTDS JDBC ドライバ とオープンソースの Microsoft JDBC ドライバ が含まれています。JDBC 4.0 ドライバが含まれているため、Microsoft SQL Server に対してコネクターを実行する前に追加手順は必要ありません。
なんらかの理由で Microsoft JDBC ドライバを手動でインストールする必要がある場合は、以下を実行します。
- 最新バージョンの JDBC ドライバアーカイブ(たとえば、英語の場合は
sqljdbc_7.2.2.0_enu.tar.gz
)をダウンロードします。 - ファイルのコンテンツを一時ディレクトリに展開して、使用している Java バージョンに合った適切な JAR ファイルを探します。たとえば、7.2.2.0 バージョンのドライバをダウンロードした場合は、以下の JAR ファイルを探します。
mssql-jdbc-7.2.2.jre8.jar
(Java 8 で Connect を実行する場合)。mssql-jdbc-7.2.2.jre11.jar
(Java 11 で Connect を実行する場合)。
- JDBC Source Connector または JDBC Sink Connector をデプロイする前に、各 Connect ワーカーノードで以下の手順を実行します。
- 既存の
share/java/kafka-connect-jdbc/jtds-1.3.1.jar
ファイルを Confluent Platform インストールから削除します。 - Confluent Platform インストール環境の
share/java/kafka-connect-jdbc/
ディレクトリに JAR ファイルをインストールします。 - Connect ワーカーを再起動します。
- 既存の
現在の Java バージョンをサポートしていない JDBC ドライバの JAR ファイルをインストールして、SQL Server データベースを使用する JDBC Source Connector または JDBC Sink Connector を起動しようとすると、コネクターが UnsupportedClassVersionError
によって失敗する可能性があります。この場合は、JDBC ドライバの JAR ファイルを削除し、正しい JAR ファイルを使用してドライバインストールプロセスを繰り返します。
Kerberos 認証¶
JDBC Source Connector および JDBC Sink Connector は、Kerberos を使用して SQL Server で認証を行うことができます。Kerberos は、JDBC Source Connector または JDBC Sink Connector が実行される各コネクターワーカーにインストールして構成する必要があります。SQL Server での Kerberos の構成の詳細については、Microsoft のドキュメント を参照してください。
KDC に Kerberos プリンシパルをセットアップしたら、新しい JDBC Source Connector または JDBC Sink Connector の構成に以下のプロパティを追加します。
connection.authenticationScheme=JavaKerberos
connection.integratedSecurity=true
connection.userName=user@REALM
connection.password=*****
ユーザーが krb5.conf
ファイルの default_realm
セットに属している場合は、connection.userName
プロパティの値を REALM
にする必要はありません。これらのプロパティがコネクターとともに使用されると、指定した Kerberos プリンシパルとパスワードを使用して、SQL Server への接続が確立されます。
注釈
JDBC Source および JDBC Sink の connection.user
構成のプロパティは、Kerberos では使用されません。代わりに connection.userName
プロパティが使用されます。
Kerberos 認証の例外¶
Kafka クラスターが Kerberos(SASL_SSL)で保護されている場合、統合された Kerberos 認証が構成されている Microsoft SQL Server データベースにコネクターでアクセスすると、以下の例外が発生します。
connect-distributed: Caused by: javax.security.auth.login.LoginException: Unable to obtain Principal Name for authentication
この例外を解決するには、Connect プロパティファイルを読み込む前に、システムプロパティを変更する必要があります。以下を入力します。
export KAFKA_OPTS="-Djavax.security.auth.useSubjectCredsOnly=false"
bin/connect-distributed etc/kafka/connect-distributed.properties
このシステムプロパティの詳細については、Oracle のドキュメント を参照してください。
PostgreSQL データベース¶
JDBC Source Connector と JDBC Sink Connector には、PostgreSQL データベースサーバーに対して読み書きするために、オープンソースの PostgreSQL JDBC 4.0 ドライバ が含まれています。JDBC 4.0 ドライバが含まれているため、PostgreSQL データベースに対してコネクターを実行する前に追加手順は必要ありません。
Oracle Database¶
Oracle は、Oracle 用の JDBC ドライバ をいくつか提供しています。最新バージョンを見つけて ojdbc8.jar
(Java 8 で Connect を実行する場合)または ojdbc10.jar
(Java 11 で Connect を実行する場合)をダウンロードします。ダウンロードした JAR ファイルを Confluent Platform インストール環境の share/java/kafka-connect-jdbc
ディレクトリに配置し、すべての Connect ワーカーノードを再起動します。
JDBC ドライバとコンパニオン JAR を含む tar.gz
ファイルをダウンロードした場合は、tar.gz
のファイルコンテンツを一時ディレクトリに展開し、readme ファイルを使用して必要な JAR ファイルを特定します。各 Connect ワーカーノードで、JDBC ドライバの JAR ファイルとその他の必要なコンパニオン JAR ファイルを Confluent Platform インストール環境の share/java/kafka-connect-jdbc
ディレクトリにコピーし、すべての Connect ワーカーノードを再起動します。
現在の Java バージョンをサポートしていない JDBC ドライバの JAR ファイルをインストールして、Oracle データベースを使用する JDBC Source Connector または JDBC Sink Connector を起動しようとすると、コネクターが UnsupportedClassVersionError
によって失敗する可能性があります。この場合は、JDBC ドライバの JAR ファイルを削除し、正しい JAR ファイルを使用してドライバインストールプロセスを繰り返します。
Kerberos 認証¶
JDBC Source Connector および JDBC Sink Connector は、Kerberos を使用して Oracle で認証を行うことができます。Kerberos は、JDBC Source Connector または JDBC Sink Connector が実行される各 Connect ワーカーにインストールして構成する必要があります。Oracle での Kerberos の構成の詳細については、Oracle のドキュメント を参照してください。
KDC に Kerberos プリンシパルをセットアップしたら、okinit
を実行して、各 Connect ワーカーで初期チケットをリクエストします。
okinit <username>
認証情報を入力します。
チケットキャッシュを見つけます(デフォルトでは /tmp/krb5cc_<uid>
にあります)。下記のコマンドを使用してチケットの一覧を表示し、チケットキャッシュの場所を確認します。
oklist -f
以上の後、新しい JDBC Source Connector または JDBC Sink Connector の構成に以下のプロパティを追加します。
connection.oracle.net.authentication_services=(KERBEROS5)
connection.oracle.net.kerberos5_mutual_authentication=true
connection.oracle.net.kerberos5_cc_name=/tmp/krb5cc_5088
最後のプロパティの値には、各マシンでのチケットキャッシュの場所を指定します。これらのプロパティにより、Kerberos を使用したデータベースでの認証が Oracle JDBC ドライバに指示されます。
注釈
以下の JDBC Source Connector および JDBC Sink Connector の構成のプロパティは、Kerberos を使用する場合には必要ありません。
connection.user
connection.password
トラブルシューティングについては、Oracle のドキュメント を参照してください。
IBM DB2¶
IBM は、DB2 のバージョンに合わせて多数の DB2 用の JDBC ドライバ を提供しています。通常は、最新の JDBC 4.0 ドライバを選択し、いずれかのダウンロードオプションを選択します。ダウンロードした tar.gz
ファイルを展開し、その中の db2jcc4.jar
ファイルを見つけて、db2jdcc4.jar
ファイル "のみ" を Confluent Platform インストール環境の share/java/kafka-connect-jdbc
ディレクトリに配置します。
たとえば、圧縮された tar.gz
ファイル( v10.5fp10_jdbc_sqlj.tar.gz
など)をダウンロードした場合は、以下の手順を実行します。
tar.gz
ファイルのコンテンツを一時ディレクトリに展開します。- 展開したファイルから ZIP ファイル(
db2_db2driver_for_jdbc_sqlj
など)を見つけます。 zip
ファイルのコンテンツを別の一時ディレクトリに展開します。db2jdcc4.jar
ファイルを見つけたら、各 Connect ワーカーノードで、Confluent Platform インストール環境のshare/java/kafka-connect-jdbc
ディレクトリにファイルをコピーして、すべての Connect ワーカーノードを再起動します。- 2 つの一時ディレクトリを削除します。
注釈
IBM ダウンロードに含まれる他のファイルを、Confluent Platform インストール環境の share/java/kafka-connect-jdbc
ディレクトリに配置しないでください。AS/400 で実行されている DB2 の UPSERT は、Confluent JDBC コネクターでは現在サポートされていません。
MySQL サーバー¶
MySQL は、さまざまなプラットフォームに対して MySQL 用の Connect/J JDBC ドライバ を提供しています。Platform Independent オプションを選択し、Compressed TAR Archive をダウンロードします。このファイルには、JAR ファイルとソースコードの両方が含まれています。
この tar.gz
ファイルのコンテンツを一時ディレクトリに展開します。展開したファイルには jar
ファイルが 1 つ含まれます(たとえば、mysql-connector-java-8.0.16.jar
)。各 Connect ワーカーノードで、"この JAR ファイルのみ" を Confluent Platform インストール環境の share/java/kafka-connect-jdbc
ディレクトリにコピーし、すべての Connect ワーカーノードを再起動します。
SAP HANA¶
SAP は SAP HANA JDBC ドライバ を提供しています。これは Maven Central で入手できます。最新バージョンの JAR ファイル(たとえば、ngdbc-2.4.56.jar
)をダウンロードし、各 Connect ワーカーノードで Confluent Platform インストール環境の share/java/kafka-connect-jdbc
ディレクトリに配置して、すべての Connect ワーカーノードを再起動します。
SQLite 組み込みデータベース¶
JDBC Source Connector と JDBC Sink Connector には、ローカルの SQLite データベースに対して読み書きするために、オープンソースの SQLite JDBC 4.0 ドライバ が含まれています。ローカル SQLite 組み込みデータベースを使用するのは、主に開発テストのためです。
その他のデータベース¶
その他のデータベース用の JDBC 4.0 ドライバの JAR ファイルを見つけたら、必要な JAR ファイルのみを各 Connect ワーカーノードの Confluent Platform インストール環境の share/java/kafka-connect-jdbc
ディレクトリに配置し、すべての Connect ワーカーノードを再起動します。