Tutorial: Use Schema Registry on Confluent Platform to Implement Schemas for a Client Application

This tutorial provides a step-by-step workflow for using Confluent Schema Registry on Confluent Platform. You will learn how to enable client applications to read and write Avro data, check compatibility as schemas evolve, and use Confluent Control Center, which has integrated capabilities with Schema Registry.

See also

For an example that shows how to set Docker environment variables for Confluent Platform running in ZooKeeper mode, see the Confluent Platform demo. Refer to the demo’s docker-compose.yml file for a configuration reference.

Terminology

First let us levelset on terminology, and answer the question: What is a topic versus a schema versus a subject?

Topic

A Kafka topic contains messages, and each message is a key-value pair. The key acts as an identifier (for example a message key may be a two-digit number), while the message contains actionable data (such as a customer name or address). Message keys are not required and not always used.

Message Keys and Values

Either the message key or the message value, or both, can be serialized as Avro, JSON, or Protobuf. For example, the message value may be using an Avro record, while the key may be a primitive (string, integer, and so forth). Typically message keys, if used, are primitives, but they can be complex data types as well (for example, record or array). How you set the key is up to you and the requirements of your implementation. For detailed examples of key and value schemas, see the discussion under Formats, Serializers, and Deserializers in the Schema Registry documentation.

Schemas and Subjects

A schema defines the structure of the data format. The Kafka topic name can be independent of the schema name. Schema Registry defines a scope in which schemas can evolve, and that scope is the subject. The name of the subject depends on the configured subject name strategy, which by default is set to derive subject name from topic name. You can modify the subject name strategy on a per-topic basis. See Change the subject naming strategy for a topic to learn more.

Example Use Case

As a practical example, let’s say a retail business is streaming transactions in a Kafka topic called transactions. A producer is writing data with a schema Payment to that Kafka topic transactions. If the producer is serializing the message value as Avro, then Schema Registry has a subject called transactions-value. If the producer is also serializing the message key as Avro, Schema Registry would have a subject called transactions-key, but for simplicity, in this tutorial consider only the message value. That Schema Registry subject transactions-value has at least one schema called Payment. The subject transactions-value defines the scope in which schemas for that subject can evolve and Schema Registry does compatibility checking within this scope. In this scenario, if developers evolve the schema Payment and produce new messages to the topic transactions, Schema Registry checks that those newly evolved schemas are compatible with older schemas in the subject transactions-value and adds those new schemas to the subject.

Setup

Prerequisites

Verify that you have installed the following on your local machine:

  • Confluent Platform
  • Confluent CLI
  • Java 8 or 11 to run Confluent Platform (for full information about Confluent Platform Java prerequisites, see Java under Supported Versions and Interoperability)
  • Maven to compile the client Java code
  • jq tool to nicely format the results from querying the Schema Registry REST endpoint

Environment Setup

  1. Use the Quick Start for Confluent Platform to bring up a single-node Confluent Platform development environment. With a single-line confluent local command, you can have a basic Kafka cluster with Schema Registry, Control Center, and other services running on your local machine.

    confluent local services start
    

    Your output should resemeble:

    Starting zookeeper
    zookeeper is [UP]
    Starting kafka
    kafka is [UP]
    Starting schema-registry
    schema-registry is [UP]
    Starting kafka-rest
    kafka-rest is [UP]
    Starting connect
    connect is [UP]
    Starting ksql-server
    ksql-server is [UP]
    Starting control-center
    control-center is [UP]
    
  2. Clone the Confluent examples repo from GitHub and work in the clients/avro/ subdirectory, which provides the sample code you will compile and run in this tutorial.

    git clone https://github.com/confluentinc/examples.git
    
    cd examples/clients/avro
    
    git checkout 7.6.0-post
    
  3. Create a local configuration file with all the Kafka and Schema Registry connection information that is running on your local machine, and save it to $HOME/.confluent/java.config, where $HOME represents your user home directory. It should resemble below:

    # Required connection configs for Kafka producer, consumer, and admin
    bootstrap.servers={{ BROKER_ENDPOINT }}
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
    sasl.mechanism=PLAIN
    # Required for correctness in Apache Kafka clients prior to 2.6
    client.dns.lookup=use_all_dns_ips
    
    # Best practice for higher availability in Apache Kafka clients prior to 3.0
    session.timeout.ms=45000
    
    # Best practice for Kafka producer to prevent data loss
    acks=all
    
    # Required connection configs for Confluent Cloud Schema Registry
    schema.registry.url=https://{{ SR_ENDPOINT }}
    basic.auth.credentials.source=USER_INFO
    basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
    

Create the transactions topic

For the exercises in this tutorial, you will be producing to and consuming from a topic called transactions. Create this topic in Control Center.

  1. Navigate to the Control Center web interface at http://localhost:9021/.

    Important

    It may take a minute or two for Control Center to come online.

    ../_images/c3-landing-page.png
  2. Click into the cluster, select Topics and click Add a topic.

    ../_images/c3-create-topic-sr.png
  3. Name the topic transactions and click Create with defaults.

    ../_images/c3-create-topic-name-sr.png

    The new topic is displayed.

    ../_images/c3-create-topic-new-sr.png

Schema Definition

The first thing developers need to do is agree on a basic schema for data. Client applications form a contract:

  • producers will write data in a schema
  • consumers will be able to read that data

Consider the original Payment schema Payment.avsc. To view the schema, run this command:

cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

Observe the schema definition:

{
 "namespace": "io.confluent.examples.clients.basicavro",
 "type": "record",
 "name": "Payment",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "amount", "type": "double"}
 ]
}

Here is a break-down of what this schema defines:

  • namespace: a fully qualified name that avoids schema naming conflicts
  • type: Avro data type, for example, record, enum, union, array, map, or fixed
  • name: unique schema name in this namespace
  • fields: one or more simple or complex data types for a record. The first field in this record is called id, and it is of type string. The second field in this record is called amount, and it is of type double.

Client Applications Writing Avro

Maven

This tutorial uses Maven to configure the project and dependencies. Java applications that have Kafka producers or consumers using Avro require pom.xml files to include, among other things:

  • Confluent Maven repository
  • Confluent Maven plugin repository
  • Dependencies org.apache.avro.avro and io.confluent.kafka-avro-serializer to serialize data as Avro
  • Plugin avro-maven-plugin to generate Java class files from the source schema

The pom.xml file may also include:

  • Plugin kafka-schema-registry-maven-plugin to check compatibility of evolving schemas

For a full pom.xml example, refer to this pom.xml.

Configuring Avro

Kafka applications using Avro data and Schema Registry need to specify at least two configuration parameters:

  • Avro serializer or deserializer
  • Properties to connect to Schema Registry

There are two basic types of Avro records that your application can use:

  • a specific code-generated class, or
  • a generic record

The examples in this tutorial demonstrate how to use the specific Payment class. Using a specific code-generated class requires you to define and compile a Java class for your schema, but it easier to work with in your code.

However, in other scenarios where you need to work dynamically with data of any type and do not have Java classes for your record types, use GenericRecord.

Confluent Platform also provides a serializer and deserializer for writing and reading data in “reflection Avro” format. To learn more, see Reflection Based Avro Serializer and Deserializer.

Java Producers

Within the client application, Java producers need to configure the Avro serializer for the Kafka value (or Kafka key) and URL to Schema Registry. Then the producer can write records where the Kafka value is of Payment class.

Example Producer Code

When constructing the producer, configure the message value class to use the application’s code-generated Payment class. For example:

...
import io.confluent.kafka.serializers.KafkaAvroSerializer;
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
...
KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props));
final Payment payment = new Payment(orderId, 1000.00d);
final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
producer.send(record);
...

Because the pom.xml includes avro-maven-plugin, the Payment class is automatically generated during compile.

In this example, the connection information to the Kafka brokers and Schema Registry is provided by the configuration file that is passed into the code, but if you want to specify the connection information directly in the client application, see this java template.

For a full Java producer example, refer to the producer example.

Run the Producer

Run the following commands in a shell from examples/clients/avro.

  1. To run this producer, first compile the project:

    mvn clean compile package
    
  2. From the Control Center navigation menu at http://localhost:9021/, make sure the cluster is selected, and click Topics.

    Next, click the transactions topic and go to the Messages tab.

    You should see no messages because no messages have been produced to this topic yet.

  3. Run ProducerExample, which produces Avro-formatted messages to the transactions topic. Pass in the path to the file you created earlier, $HOME/.confluent/java.config.

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
      -Dexec.args="$HOME/.confluent/java.config"
    

    The command takes a moment to run. When it completes, you should see:

    ...
    Successfully produced 10 messages to a topic called transactions
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...
    
  4. Now you should be able to see messages in Control Center by inspecting the transactions topic as it dynamically deserializes the newly arriving data that was serialized as Avro.

    At http://localhost:9021/, click into the cluster on the left, then go to Topics -> transactions -> Messages.

    Tip

    If you do not see any data, rerun the Producer and verify it completed successfully, and look at the Control Center again. The messages do not persist in the Console, so you need to view them soon after you run the producer.

    ../_images/c3-inspect-transactions.png

Java Consumers

Within the client application, Java consumers need to configure the Avro deserializer for the Kafka value (or Kafka key) and URL to Schema Registry. Then the consumer can read records where the Kafka value is of Payment class.

Example Consumer Code

By default, each record is deserialized into an Avro GenericRecord, but in this tutorial the record should be deserialized using the application’s code-generated Payment class. Therefore, configure the deserializer to use Avro SpecificRecord, i.e., SPECIFIC_AVRO_READER_CONFIG should be set to true. For example:

...
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
...
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
  ConsumerRecords<String, Payment> records = consumer.poll(100);
  for (ConsumerRecord<String, Payment> record : records) {
    String key = record.key();
    Payment value = record.value();
  }
}
...

Because the pom.xml includes avro-maven-plugin, the Payment class is automatically generated during compile.

In this example, the connection information to the Kafka brokers and Schema Registry is provided by the configuration file that is passed into the code, but if you want to specify the connection information directly in the client application, see this java template.

For a full Java consumer example, refer to the consumer example.

Run the Consumer

  1. To run this consumer, first compile the project.

    mvn clean compile package
    

    The BUILD SUCCESS message indicates the project built, and the command prompt becomes available again.

    (If you got an error about missing dependencies for slf4j-log4j12 and confluent-log4j, install current versions of these libraries.)

  2. Then run ConsumerExample (assuming you already ran the ProducerExample above). Pass in the path to the file you created earlier, $HOME/.confluent/java.config.

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
      -Dexec.args="$HOME/.confluent/java.config"
    

    You should see:

    ...
    key = id0, value = {"id": "id0", "amount": 1000.0}
    key = id1, value = {"id": "id1", "amount": 1000.0}
    key = id2, value = {"id": "id2", "amount": 1000.0}
    key = id3, value = {"id": "id3", "amount": 1000.0}
    key = id4, value = {"id": "id4", "amount": 1000.0}
    key = id5, value = {"id": "id5", "amount": 1000.0}
    key = id6, value = {"id": "id6", "amount": 1000.0}
    key = id7, value = {"id": "id7", "amount": 1000.0}
    key = id8, value = {"id": "id8", "amount": 1000.0}
    key = id9, value = {"id": "id9", "amount": 1000.0}
    ...
    
  3. Press Ctrl+C to stop.

Other Kafka Clients

The objective of this tutorial is to learn about Avro and Schema Registry centralized schema management and compatibility checks. To keep examples simple, this tutorial focuses on Java producers and consumers, but other Kafka clients work in similar ways. For examples of other Kafka clients interoperating with Avro and Schema Registry:

Centralized Schema Management

Viewing Schemas in Schema Registry

At this point, you have producers serializing Avro data and consumers deserializing Avro data. The producers are registering schemas to Schema Registry and consumers are retrieving schemas from Schema Registry.

  1. From the Control Center navigation menu at http://localhost:9021/, make sure the cluster is selected on the left, and click Topics.

  2. Click the transactions topic and go to the Schema tab to retrieve the latest schema from Schema Registry for this topic:

    ../_images/c3-schema-transactions.png

    The schema is identical to the schema file defined for Java client applications.

Using curl to Interact with Schema Registry

You can also use curl commands to connect directly to the REST endpoint in Schema Registry to view subjects and associated schemas.

  1. To view all the subjects registered in Schema Registry (assuming Schema Registry is running on the local machine listening on port 8081):

    curl --silent -X GET http://localhost:8081/subjects/ | jq .
    

    Here is the expected output of the above command:

    [
      "transactions-value"
    ]
    

    In this example, the Kafka topic transactions has messages whose value (that is, payload) is Avro, and by default the Schema Registry subject name is transactions-value.

  2. To view the latest schema for this subject in more detail:

    curl --silent -X GET http://localhost:8081/subjects/transactions-value/versions/latest | jq .
    

    Here is the expected output of the above command:

    {
      "subject": "transactions-value",
      "version": 1,
      "id": 1,
      "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
    }
    

    Here is a break-down of what this version of the schema defines:

    • subject: the scope in which schemas for the messages in the topic transactions can evolve
    • version: the schema version for this subject, which starts at 1 for each subject
    • id: the globally unique schema version id, unique across all schemas in all subjects
    • schema: the structure that defines the schema format

    Notice that in the output to the curl command above, the schema is escaped JSON; the double quotes are preceded by backslashes.

  3. Based on the schema id, you can also retrieve the associated schema by querying Schema Registry REST endpoint as follows:

    curl --silent -X GET http://localhost:8081/schemas/ids/1 | jq .
    

    Here is the expected output:

    {
      "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
    }
    

Schema IDs in Messages

Integration with Schema Registry means that Kafka messages do not need to be written with the entire Avro schema. Instead, Kafka messages are written with the schema id. The producers writing the messages and the consumers reading the messages must be using the same Schema Registry to get the same mapping between a schema and schema id.

In this example, a producer sends the new schema for Payments to Schema Registry. Schema Registry registers this schema Payments to the subject transactions-value, and returns the schema id of 1 to the producer. The producer caches this mapping between the schema and schema id for subsequent message writes, so it only contacts Schema Registry on the first schema write.

When a consumer reads this data, it sees the Avro schema id of 1 and sends a schema request to Schema Registry. Schema Registry retrieves the schema associated to schema id 1, and returns the schema to the consumer. The consumer caches this mapping between the schema and schema id for subsequent message reads, so it only contacts Schema Registry on the first schema id read.

Auto Schema Registration

By default, client applications automatically register new schemas. If they produce new messages to a new topic, then they will automatically try to register new schemas. This is convenient in development environments, but in production environments it’s recommended that client applications do not automatically register new schemas. Best practice is to register schemas outside of the client application to control when schemas are registered with Schema Registry and how they evolve.

Within the application, you can disable automatic schema registration by setting the configuration parameter auto.register.schemas=false, as shown in the following example.

props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);

Tip

  • If you want to enable use.latest.version for producers, you must disable auto schema registration by setting auto.register.schemas to false, and use.latest.version to true. (The opposite of their defaults.) The option auto.register.schemas must be set to false in order for use.latest.version to work.

    Setting auto.register.schemas to false disables auto-registration of the event type, so that it does not override the latest schema in the subject. Setting use.latest.version to true causes the serializer to look up the latest schema version in the subject and use that for serialization. If use.latest.version is set to false (which is the default), the serializer will look for the event type in the subject and fail to find it.

    You can also set use.latest.version on consumers, which causes the consumer to look up the latest schema version in the subject and use that as the target schema during deserialization.

  • See also, Schema Registry Configuration Options for Kafka Connect.

  • The configuration option auto.register.schemas is a Confluent Platform feature; not available in Apache Kafka®.

To manually register the schema outside of the application, you can use Control Center.

First, create a new topic called test in the same way that you created a new topic called transactions earlier in the tutorial. Then from the Schema tab, click Set a schema to define the new schema. Specify values for:

  • namespace: a fully qualified name that avoids schema naming conflicts
  • type: Avro data type, one of record, enum, union, array, map, fixed
  • name: unique schema name in this namespace
  • fields: one or more simple or complex data types for a record. The first field in this record is called id, and it is of type string. The second field in this record is called amount, and it is of type double.

If you were to define the same schema as used earlier, you would enter the following in the schema editor:

{
  "type": "record",
  "name": "Payment",
  "namespace": "io.confluent.examples.clients.basicavro",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "amount",
      "type": "double"
    }
  ]
}

If you prefer to connect directly to the REST endpoint in Schema Registry, then to define a schema for a new subject for the topic test, run the command below.

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \
  http://localhost:8081/subjects/test-value/versions

In this sample output, it creates a schema with id of 1.:

{"id":1}

Schema Evolution and Compatibility

Evolving Schemas

So far in this tutorial, you have seen the benefit of Schema Registry as being centralized schema management that enables client applications to register and retrieve globally unique schema ids. The main value of Schema Registry, however, is in enabling schema evolution. Similar to how APIs evolve and need to be compatible for all applications that rely on old and new versions of the API, schemas also evolve and likewise need to be compatible for all applications that rely on old and new versions of a schema. This schema evolution is a natural behavior of how applications and data develop over time.

Schema Registry allows for schema evolution and provides compatibility checks to ensure that the contract between producers and consumers is not broken. This allows producers and consumers to update independently and evolve their schemas independently, with assurances that they can read new and legacy data. This is especially important in Kafka because producers and consumers are decoupled applications that are sometimes developed by different teams.

Transitive compatibility checking is important once you have more than two versions of a schema for a given subject. If compatibility is configured as transitive, then it checks compatibility of a new schema against all previously registered schemas; otherwise, it checks compatibility of a new schema only against the latest schema.

For example, if there are three schemas for a subject that change in order X-2, X-1, and X then:

  • transitive: ensures compatibility between X-2 <==> X-1 and X-1 <==> X and X-2 <==> X
  • non-transitive: ensures compatibility between X-2 <==> X-1 and X-1 <==> X, but not necessarily X-2 <==> X

Refer to an example of schema changes which are incrementally compatible, but not transitively so.

The Confluent Schema Registry default compatibility type BACKWARD is non-transitive, which means that it’s not BACKWARD_TRANSITIVE. As a result, new schemas are checked for compatibility only against the latest schema.

These are the compatibility types:

  • BACKWARD: (default) consumers using the new schema can read data written by producers using the latest registered schema
  • BACKWARD_TRANSITIVE: consumers using the new schema can read data written by producers using all previously registered schemas
  • FORWARD: consumers using the latest registered schema can read data written by producers using the new schema
  • FORWARD_TRANSITIVE: consumers using all previously registered schemas can read data written by producers using the new schema
  • FULL: the new schema is forward and backward compatible with the latest registered schema
  • FULL_TRANSITIVE: the new schema is forward and backward compatible with all previously registered schemas
  • NONE: schema compatibility checks are disabled

Refer to Schema Evolution and Compatibility for a more in-depth explanation on the compatibility types.

Failing Compatibility Checks

Schema Registry checks compatibility as schemas evolve to uphold the producer-consumer contract. Without Schema Registry checking compatibility, your applications could potentially break on schema changes.

In the Payment schema example, let’s say the business now tracks additional information for each payment, for example, a field region that represents the place of sale. Consider the Payment2a schema which includes this extra field region:

cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc
{
 "namespace": "io.confluent.examples.clients.basicavro",
 "type": "record",
 "name": "Payment",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "amount", "type": "double"},
     {"name": "region", "type": "string"}
 ]
}

Before proceeding, because the default Schema Registry compatibility is backward, think about whether this new schema is backward compatible. Specifically, ask yourself whether a consumer can use this new schema to read data written by producers using the older schema without the region field. The answer is no. Consumers will fail reading data with the older schema because the older data does not have the region field, therefore this schema is not backward compatible.

Confluent provides a Schema Registry Maven Plugin, which you can use to check compatibility in development or integrate into your CI/CD pipeline.

Our sample pom.xml includes this plugin to enable compatibility checks.

...
<properties>
  <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
  <schemaRegistryBasicAuthUserInfo></schemaRegistryBasicAuthUserInfo>
</properties>
...
<build>
  <plugins>
  ...
    <plugin>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-schema-registry-maven-plugin</artifactId>
        <version>${confluent.version}</version>
        <configuration>
            <schemaRegistryUrls>
                <param>${schemaRegistryUrl}</param>
            </schemaRegistryUrls>
            <userInfoConfig>${schemaRegistryBasicAuthUserInfo}</userInfoConfig>
            <subjects>
                <transactions-value>src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc</transactions-value>
            </subjects>
        </configuration>
        <goals>
            <goal>test-compatibility</goal>
        </goals>
    </plugin>
...
  </plugins>
</build>

It is currently configured to check compatibility of the new Payment2a schema for the transactions-value subject in Schema Registry.

  1. Run the compatibility check and verify that it fails:

    mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility
    

    Here is the error message you will get:

    ...
    [ERROR] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc is not compatible with subject(transactions-value)
    ...
    
  2. Try to register the new schema Payment2a manually to Schema Registry, which is a useful way for non-Java clients to check compatibility if you are not using Control Center:

    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\"}]}"}' \
      http://localhost:8081/subjects/transactions-value/versions
    

    As expected, Schema Registry rejects the schema with an error message that it is incompatible:

    {"error_code":409,"message":"Schema being registered is incompatible with an earlier schema"}
    

Passing Compatibility Checks

To maintain backward compatibility, a new schema must assume default values for the new field if it is not provided.

  1. Consider an updated Payment2b schema that has a default value for region. To view the schema, run this command:

    cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2b.avsc
    

    You should see the following output.

    {
     "namespace": "io.confluent.examples.clients.basicavro",
     "type": "record",
     "name": "Payment",
     "fields": [
         {"name": "id", "type": "string"},
         {"name": "amount", "type": "double"},
         {"name": "region", "type": "string", "default": ""}
     ]
    }
    
  2. From UI, click the transactions topic and go to the Schema tab to retrieve the transactions topic’s latest schema from Schema Registry.

  3. Click Edit Schema.

    ../_images/tutorial-c3-edit-schema.png
  4. Add the new field region again, this time including the default value as shown below, then click Save.

    {
     "name": "region",
     "type": "string",
     "default": ""
    }
    
  5. Verify that the new schema is accepted.

    ../_images/tutorial-c3-edit-schema-pass.png

    Note

    If you get error messages about invalid Avro, check syntax; for example, quotes and colons, enclosing brackets, comma-separated from the previous field, and so on.)

  6. Think about the registered schema versions. The Schema Registry subject for the topic transactions that is called transactions-value has two schemas:

    • version 1 is Payment.avsc
    • version 2 is Payment2b.avsc that has the additional field for region with a default empty value.
  7. In the UI, still on the Schema tab for the topic transactions, click Version history and select Turn on version diff to compare the two versions:

    ../_images/tutorial-c3-schema-compare.png
  8. At the command line, go back to the Schema Registry Maven Plugin, update the pom.xml to refer to Payment2b.avsc instead of Payment2a.avsc.

  9. Re-run the compatibility check and verify that it passes:

    mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility
    
  10. Verify that get this message showing that the schema passed the compatibility check.

    ...
    [INFO] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2b.avsc is compatible with subject(transactions-value)
    ...
    
  11. If you prefer to connect directly to the REST endpoint in Schema Registry, then to register the new schema Payment2b, run the command below. It should succeed.

    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"}' \
      http://localhost:8081/subjects/transactions-value/versions
    

    The above curl command, if successful, returns the version id of the new schema:

    {"id":2}
    
  12. View the latest subject for transactions-value in Schema Registry:

    curl --silent -X GET http://localhost:8081/subjects/transactions-value/versions/latest | jq .
    

    This command returns the latest Schema Registry subject for the transactions-value topic, including version number, id, and a description of the schema in JSON:

    {
      "subject": "transactions-value",
      "version": 2,
      "id": 2,
      "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"
    }
    

    Notice the changes:

    • version: changed from 1 to 2
    • id: changed from 1 to 2
    • schema: updated with the new field region that has a default value

Changing Compatibility Type

The default compatibility type is backward, but you may change it globally or per subject.

To change the compatibility type per subject from the UI, click the transactions topic and go to the Schema tab to retrieve the transactions topic’s latest schema from Schema Registry. Click Edit Schema and then click Compatibility Mode.

../_images/c3-edit-compatibility.png

Notice that the compatibility for this topic is set to the default backward, but you may change this as needed.

If you prefer to connect directly to the REST endpoint in Schema Registry, then to change the compatibility type for the topic transactions, i.e., for the subject transactions-value, run the example command below.

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
       --data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
       http://localhost:8081/config/transactions-value