カスタム変換

用意されている SMT(Single Message Transformation)に必要な変換機能がない場合は、独自に作成することができます。

注意

カスタム変換は現在、マネージド型コネクター では利用できません。

最初に理解しておくべき重要な概念は、一般に、SMT の実装では、抽象クラスのロジックの大部分を提供することです。そのうえで SMT の実装では、KeyValue という具体的な 2 つのサブクラスを提供し、これにより、Connect レコードのキーまたは値を処理するかどうかを指定します。変換を使用する場合、ユーザーは Key または Value クラスの完全修飾クラス名を指定します。

ちなみに

Kafka Connect で SMT を使用する方法」を参照してください。このチュートリアルでは、独自のカスタム変換の作成について詳しく説明します。

カスタム SMT の作成と使用に必要な手順の概要は、次のとおりです。

  1. デフォルトの Kafka Connect の変換 で使用できるさまざまな SMT のソース Java ファイルを確認します。これらのうちの 1 つを、新しいカスタム変換を作成するためのベースとして利用します。

    ソース Java ファイルを確認する際に注意が必要な、重要なメソッドを以下に示します。

    • apply() を検索し、このメソッドがどのように実装されているかを確認します。

    • configure() を検索し、このメソッドがどのように実装されているかを確認します。

      注釈

      詳細については、インターフェイスの変換 を参照してください。

  2. ソースコードを記述してコンパイルし、単体テストを行います。SMT の単体テストのサンプルは、Apache Kafka GitHub プロジェクト に用意されています。

  3. JAR ファイルを作成します。

  4. JAR をインストールします。カスタム SMT の JAR ファイル(変換に必要な Kafka 以外の JAR ファイルがあればそのファイルも)を、Connect のワーカー構成ファイルの plugin.path プロパティで指定されているいずれかのディレクトリの 下に あるディレクトリにコピーします。プロパティの例を以下に示します。

    plugin.path=/usr/local/share/kafka/plugins
    

    たとえば、/usr/local/share/kafka/plugins の下に my-custom-smt という名前のディレクトリを作成し、この my-custom-smt ディレクトリに JAR ファイルをコピーします。

    重要

    すべてのワーカーノードでこの手順を実行します。詳細については、「Connect プラグインのインストール」を参照してください。

  5. ワーカーとコネクターを起動して、カスタム変換を試します。

    Connect ワーカーは、変換クラスを見つけるたびに DEBUG レベルでログに出力します。DEBUG モードを有効にし、作成した変換が見つかることを確認します。見つかっていない場合は、JAR のインストールをチェックして、正しい場所にあることを確認します。

注釈

カスタムパーティショナー

デフォルトで、コネクターは、Kafka トピックで使用されているパーティショナーを継承します。コネクターのカスタムパーティショナーを作成することもできます。カスタムパーティショナーは、コネクターの /lib フォルダーに配置する必要があります。

共通の場所を決めて、パーティショナーを配置することもできます。この代替方法を使用する場合は、各コネクターの /lib フォルダーからこの場所への symlink を追加します。たとえば、カスタムパーティショナーをパス share/confluent-hub-components/partitioners に配置するとします。この場合は、symlink share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners を追加します。