Replicator スキーマ変換のサンプル

この Replicator サンプルでは、Confluent Schema Registry に格納されているスキーマの、1 つのクラスターから別のクラスターへの移動を紹介します。

概要

Replicator は、ソース Schema Registry からデスティネーション Schema Registry へとエントリを変換する機能を備えています。

このサンプルでは、ソースレジストリとデスティネーションレジストリが存在する docker-compose 環境を提供します。ここで、スキーマが変換されます。このサンプルでは、ソース Schema Registry でエントリを作成し、それをデスティネーションへと変換します。

scripts ディレクトリには、変換を準備するために実行する必要のある操作のサンプルと、必要な JSON Replicator 構成があります。

前提条件

Confluent Platform は、さまざまなオペレーティングシステムおよびソフトウェアバージョンでサポートされています(詳細については「サポートされているバージョンおよび相互運用性」を参照)。このサンプルは、以下に説明する特定の構成で検証されています。この例の Windows での実行は正式にサポートされていませんが、GitHub のサンプルコードを変更して symlink .envconfig.env の内容で置き換えると、Windows でも動作する可能性があります。

  • macOS 10.15.3
  • Confluent Platform 7.1.1
  • Java 11.0.6 2020-01-14 LTS
  • bash バージョン 3.2.57
  • jq 1.6
  • (Docker ベースのサンプル)Docker バージョン 19.03.8
  • (Docker ベースのサンプル)Docker Compose docker-compose バージョン 1.25.4

サンプルの実行

  1. confluentinc/examples GitHub リポジトリのクローンを作成します。

    git clone https://github.com/confluentinc/examples
    
  1. スキーマ変換のサンプルのディレクトリに変更します。

    cd examples/replicator-schema-translation
    
  2. 1 つのコマンドを実行してサンプル全体を起動します。このコマンドにより、送信元クラスターと送信先クラスターが自動的に作成され、ソース Schema Registry にスキーマが追加されます。これは、5 分以内に完了します。

    docker-compose up -d
    
  3. 2 分以上待機した後に、ソースおよびデスティネーション Schema Registry でサブジェクトをチェックして、サンプルが完全に起動したことを検証します。

    # Source Schema Registry should show one subject, i.e., the output should be ["testTopic-value"]
    docker-compose exec connect curl http://srcSchemaregistry:8085/subjects
    
    # Destination Schema Registry should show no subjects, i.e., the output should be []
    docker-compose exec connect curl http://destSchemaregistry:8086/subjects
    
  4. スキーマ変換の準備を行うには、ソース Schema Registry を "READONLY" モードに、デスティネーションレジストリを "IMPORT" モードにします。これは、(この例のように)デスティネーション Schema Registry にサブジェクトが登録されていない場合にのみ機能することに注意してください。サブジェクトが登録されていると、"Cannot import since found existing subjects" のようなメッセージが表示されてインポートに失敗します。

    docker-compose exec connect /etc/kafka/scripts/set_sr_modes_pre_translation.sh
    

    出力は以下のようになります。

    Setting srcSchemaregistry to READONLY mode:
    {"mode":"READONLY"}
    Setting destSchemaregistry to IMPORT mode:
    {"mode":"IMPORT"}
    
  5. Replicator を送信して変換を実行します。

    docker-compose exec connect /etc/kafka/scripts/submit_replicator.sh
    

    出力には、提示された Replicator 構成が表示されるはずです。スキーマ変換を可能にするキー構成は、次のとおりです。 schema.subject.translator.class=io.confluent.connect.replicator.schemas.DefaultSubjectTranslator

    {"name":"testReplicator","config":{"connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector","topic.whitelist":"_schemas","topic.rename.format":"${topic}.replica","key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter","value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter","src.kafka.bootstrap.servers":"srcKafka1:10091","dest.kafka.bootstrap.servers":"destKafka1:11091","tasks.max":"1","confluent.topic.replication.factor":"1","schema.subject.translator.class":"io.confluent.connect.replicator.schemas.DefaultSubjectTranslator","schema.registry.topic":"_schemas","schema.registry.url":"http://destSchemaregistry:8086","name":"testReplicator"},"tasks":[],"type":"source"}
    
  6. ソースおよびデスティネーション Schema Registry のサブジェクトに再びアクセスしてスキーマ変換を検証します。

    # Source Schema Registry should show one subject, i.e., the output should be ["testTopic-value"]
    docker-compose exec connect curl http://srcSchemaregistry:8085/subjects
    
    # Destination Schema Registry should show one subject, i.e., the output should be ["testTopic.replica-value"]
    docker-compose exec connect curl http://destSchemaregistry:8086/subjects
    
  7. サンプルを完了するには、両方の Schema Registry を READWRITE モードにします。これで移行処理が完了します。

    docker-compose exec connect /etc/kafka/scripts/set_sr_modes_post_translation.sh
    

ちなみに

このサンプルは、複数の自己管理型クラスターにわたるスキーマの "1 回のみの移行" を示しています。"継続的移行" を構成するには、最後の手順として、オリジン(ソース)|sr| を READWRITE モードに設定し、デスティネーションを IMPORT モードのままにします。これにより、"一方向"の移行がセットアップされます。つまり、アクティブ/パッシブ Replicator セットアップです。

終了後の手順

  1. サンプルを停止し、すべてのローカルコンポーネントを破棄します。

    docker-compose down