カスタム変換¶
用意されている SMT(Single Message Transformation)に必要な変換機能がない場合は、独自に作成することができます。
注意
カスタム変換は現在、マネージド型コネクター では利用できません。
最初に理解しておくべき重要な概念は、一般に、SMT の実装では、抽象クラスのロジックの大部分を提供することです。そのうえで SMT の実装では、Key と Value という具体的な 2 つのサブクラスを提供し、これにより、Connect レコードのキーまたは値を処理するかどうかを指定します。変換を使用する場合、ユーザーは Key または Value クラスの完全修飾クラス名を指定します。
ちなみに
「Kafka Connect で SMT を使用する方法」を参照してください。このチュートリアルでは、独自のカスタム変換の作成について詳しく説明します。
カスタム SMT の作成と使用に必要な手順の概要は、次のとおりです。
デフォルトの Kafka Connect の変換 で使用できるさまざまな SMT のソース Java ファイルを確認します。これらのうちの 1 つを、新しいカスタム変換を作成するためのベースとして利用します。
ソース Java ファイルを確認する際に注意が必要な、重要なメソッドを以下に示します。
apply()
を検索し、このメソッドがどのように実装されているかを確認します。configure()
を検索し、このメソッドがどのように実装されているかを確認します。注釈
詳細については、インターフェイスの変換 を参照してください。
ソースコードを記述してコンパイルし、単体テストを行います。SMT の単体テストのサンプルは、Apache Kafka GitHub プロジェクト に用意されています。
JAR ファイルを作成します。
JAR をインストールします。カスタム SMT の JAR ファイル(変換に必要な Kafka 以外の JAR ファイルがあればそのファイルも)を、Connect のワーカー構成ファイルの
plugin.path
プロパティで指定されているいずれかのディレクトリの 下に あるディレクトリにコピーします。プロパティの例を以下に示します。plugin.path=/usr/local/share/kafka/plugins
たとえば、
/usr/local/share/kafka/plugins
の下にmy-custom-smt
という名前のディレクトリを作成し、このmy-custom-smt
ディレクトリに JAR ファイルをコピーします。重要
すべてのワーカーノードでこの手順を実行します。詳細については、「Connect プラグインのインストール」を参照してください。
ワーカーとコネクターを起動して、カスタム変換を試します。
Connect ワーカーは、変換クラスを見つけるたびに DEBUG レベルでログに出力します。DEBUG モードを有効にし、作成した変換が見つかることを確認します。見つかっていない場合は、JAR のインストールをチェックして、正しい場所にあることを確認します。
注釈
カスタムパーティショナー
デフォルトで、コネクターは、Kafka トピックで使用されているパーティショナーを継承します。コネクターのカスタムパーティショナーを作成することもできます。カスタムパーティショナーは、コネクターの /lib
フォルダーに配置する必要があります。
共通の場所を決めて、パーティショナーを配置することもできます。この代替方法を使用する場合は、各コネクターの /lib
フォルダーからこの場所への symlink を追加します。たとえば、カスタムパーティショナーをパス share/confluent-hub-components/partitioners
に配置するとします。この場合は、symlink share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners
を追加します。