Replicator Schema Translation Example for Confluent Platform

This Replicator example showcases the transfer of schemas stored in Confluent Schema Registry from one cluster to another.

Overview

Replicator features the ability to translate entries from a source Schema Registry to a destination Schema Registry.

This example provides a docker-compose environment with source and destination registries in which schemas are translated. In this example, you create an entry in the source Schema Registry and translate it to the destination.

The scripts directory provides examples of the operations that you must perform to prepare for the translation, as well as JSON Replicator configurations required.

Prerequisites

Confluent Platform is supported in various operating systems and software versions (see Supported Versions and Interoperability for Confluent Platform for details). This example has been validated with the specific configuration described below. If you are running the example in Windows, which is not officially supported, the example may still work if you update the example code in GitHub, replacing the symlink .env with the contents of config.env.

  • macOS 10.15.3
  • Confluent Platform 7.7.1
  • Java 11.0.6 2020-01-14 LTS
  • bash version 3.2.57
  • jq 1.6
  • (Docker-based examples) Docker version 19.03.8
  • (Docker-based examples) Docker Compose docker-compose version 1.25.4

Run Example

  1. Clone the confluentinc/examples GitHub repository.

    git clone https://github.com/confluentinc/examples
    
  1. Change directory to the Schema Translation example.

    cd examples/replicator-schema-translation
    
  2. Start the entire example by running a single command that creates source and destination clusters automatically and adds a schema to the source Schema Registry. This takes less than 5 minutes to complete.

    docker-compose up -d
    
  3. Wait at least 2 minutes and then verify the example has completely started by checking the subjects in the source and destination Schema Registry.

    # Source Schema Registry should show one subject, i.e., the output should be ["testTopic-value"]
    docker-compose exec connect curl http://srcSchemaregistry:8085/subjects
    
    # Destination Schema Registry should show no subjects, i.e., the output should be []
    docker-compose exec connect curl http://destSchemaregistry:8086/subjects
    
  4. To prepare for schema translation, put the source Schema Registry in “READONLY” mode and the destination registry in “IMPORT” mode. Note that this works only when the destination Schema Registry has no registered subjects (as is true in this example), otherwise the import would fail with a message similar to “Cannot import since found existing subjects”.

    docker-compose exec connect /etc/kafka/scripts/set_sr_modes_pre_translation.sh
    

    Your output should resemble:

    Setting srcSchemaregistry to READONLY mode:
    {"mode":"READONLY"}
    Setting destSchemaregistry to IMPORT mode:
    {"mode":"IMPORT"}
    
  5. Submit Replicator to perform the translation.

    docker-compose exec connect /etc/kafka/scripts/submit_replicator.sh
    

    Your output should show the posted Replicator configuration. The key configuration that enables the schema translation is schema.subject.translator.class=io.confluent.connect.replicator.schemas.DefaultSubjectTranslator

    {"name":"testReplicator","config":{"connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector","topic.whitelist":"_schemas","topic.rename.format":"${topic}.replica","key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter","value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter","src.kafka.bootstrap.servers":"srcKafka1:10091","dest.kafka.bootstrap.servers":"destKafka1:11091","tasks.max":"1","confluent.topic.replication.factor":"1","schema.subject.translator.class":"io.confluent.connect.replicator.schemas.DefaultSubjectTranslator","schema.registry.topic":"_schemas","schema.registry.url":"http://destSchemaregistry:8086","name":"testReplicator"},"tasks":[],"type":"source"}
    
  6. Verify the schema translation by revisiting the subjects in the source and destination Schema Registries.

    # Source Schema Registry should show one subject, i.e., the output should be ["testTopic-value"]
    docker-compose exec connect curl http://srcSchemaregistry:8085/subjects
    
    # Destination Schema Registry should show one subject, i.e., the output should be ["testTopic.replica-value"]
    docker-compose exec connect curl http://destSchemaregistry:8086/subjects
    
  7. To complete the example, reset both Schema Registries to READWRITE mode, this completes the migration process:

    docker-compose exec connect /etc/kafka/scripts/set_sr_modes_post_translation.sh
    

Tip

This example shows a one-time migration of schemas across self-managed clusters. To configure a continuous migration, the last steps would be to set the origin (source) Schema Registry to READWRITE mode, and maintain the destination in IMPORT mode. Note that this would set up a “one-way” migration; that is, an active-to-passive Replicator setup.

Teardown

  1. Stop the example, destroy all local components.

    docker-compose down