Data Contracts for Schema Registry

Confluent Schema Registry adds support for tags, metadata, and rules, which together support the concept of a Data Contract. As a part of a Stream Governance solution, data contracts play a key role in ensuring data quality, data consistency, interoperability, and compatibility when sharing information across different systems or organizations.

Requirements

Schema rules are only available on Confluent Enterprise and Confluent Cloud with the Stream Governance “Advanced” package.

Confluent Cloud requirements

To work with data contracts on Confluent Cloud Schema Registry:

Enable Schema Registry with the Advanced Stream Governance package, as described in Packages, Features, and Limits, and in the Choose a Stream Governance package and enable Schema Registry for Confluent Cloud.

Confluent Platform requirements

Schema rules are only available on Confluent Enterprise (not on the Community edition). To learn more about Confluent Platform packages, see Platform packages.

To enable schema rules for Confluent Enterprise, add the following to the Schema Registry properties before starting Schema Registry. The value of the property below can be a comma-separated list of class names:

resource.extension.class=io.confluent.kafka.schemaregistry.rulehandler.RuleSetResourceExtension

Limitations

Current limitations are:

  • Only Java clients support rules execution.
  • Kafka Connect and ksqlDB on Confluent Cloud do not support rules execution.
  • Confluent Cloud Console and Confluent Control Center do not show the new properties for Data Contracts on the schema view page, in particular metadata and rules.
  • For Avro and Protobuf, JSONata is less efficient than CEL for CONDITION rules.

Understanding the scope of a data contract

A data contract is a formal agreement between an upstream component and a downstream component on the structure and semantics of data that is in motion. A schema is only one element of a data contract. A data contract specifies and supports the following aspects of an agreement:

  • Structure. This is the part of the contract that is covered by the schema, which defines the fields and their types.
  • Integrity constraints. This includes declarative constraints or data quality rules on the domain values of fields, such as the constraint that an age must be a positive integer.
  • Metadata. Metadata is additional information about the schema or its constituent parts, such as whether a field contains sensitive information. Metadata can also include documentation for a data contract, such as who created it.
  • Rules or policies. These data rules or policies can enforce that a field that contains sensitive information must be encrypted, or that a message containing an invalid age must be sent to a dead letter queue.
  • Change or evolution. This implies that data contracts are versioned, and can support declarative migration rules for how to transform data from one version to another, so that even changes that would normally break downstream components can be easily accommodated.

Keeping in mind that a data contract is an agreement between an upstream component and a downstream component, note that:

  • The upstream component enforces the data contract.
  • The downstream component can assume that the data it receives conforms to the contract.

Data contracts are important because they provide transparency over dependencies and data usage in a stream architecture. They also help to ensure the consistency, reliability, and quality of the data in motion.

The upstream component could be a Apache Kafka® producer, while the downstream component would be the Kafka consumer. But the upstream component could also be a Kafka consumer, and the downstream component would be the application in which the Kafka consumer resides. This differentiation is important in schema evolution, where the producer may be using a newer version of the data contract, but the downstream application still expects an older version. In this case the data contract is used by the Kafka consumer to mediate between the Kafka producer and the downstream application, ensuring that the data received by the application matches the older version of the data contract, possibly using declarative transformation rules to massage the data into the desired form.

../../_images/data-contracts.png

In the diagram above, the producer may be using version 2 of the data contract, yet the application in which the consumer resides expects version 1 of the data contract. In this case the consumer will use a migration rule to transform the data so that it conforms to version 1.

To support the five aspects of a data contract mentioned above, Schema Registry has been enhanced with tags, metadata, and rules.

Tags

Tags are used to add annotations to a schema or its parts. Tags can either be inlined directly in a schema, or can be specified externally to the schema.

Below is an example of an inlined tag for an Avro schema:

{
  "type":"record",
  "name":"MyRecord",
  "fields":[{
    "name":"ssn",
    "type":"string",
    "confluent:tags": [ "PII", "PRIVATE" ]
  }]
}

For a JSON Schema:

{
  "type": "object",
  "properties": {
    "ssn": {
      "type": "string"
      "confluent:tags": [ "PII", "PRIVATE" ]
    }
  }
}

For a Protobuf schema:

syntax = "proto3";

import "confluent/meta.proto"


message MyRecord {
  string ssn = 1 [
    (confluent.field_meta).tags = "PII",
    (confluent.field.meta).tags = "PRIVATE"
  ];
}

When sending requests to Schema Registry, any of the above schemas is passed as the value of a field named schema in a JSON payload. Previously, the JSON payload had only three fields: schemaType, schema, and references. The JSON payload has been expanded to include two additional fields called metadata and ruleSet.

{
  "schemaType": "...",
  "schema": "...",
  "references": [...],
  "metadata": {
    "tags": {...},
    "properties": {...}
  },
  "ruleSet": {
    "migrationRules": [...],
    "domainRules": [...]
  }
}

The tags property of the metadata property allows external tags to be specified.

{
  "schema": "...",
  "metadata": {
    "tags": {
      "MyRecord.ssn": [ "PII" ]
    }
  },
  "ruleSet": ...
}

One feature supported by external tags that is not available with inline tags is the support for path wildcards.

{
  "schema": "...",
  "metadata": {
    "tags": {
      "**.ssn": [ "PII" ]
    }
  },
  "ruleSet": ...
}

Note that a path is a set of elements, separated by dots (.). Path wildcards support the following characters:

  • * matches any number of characters in a path element; for example, a*.bob matches alice.bob
  • ** matches any number of characters across multiple path elements; for example, a** matches alice.bob
  • ? matches any single character; for example, alic?.bob matches alice.bob

Note

  • On Confluent Cloud, any tags in the schema first must have their definitions created in the Stream Catalog beforehand. To learn more, see Inline and External tags in the Stream Catalog user guide.
  • For a schema with references, only the external tags at the root schema apply. This is different from inline tags, which are always applicable, regardless of whether the inline tag is in the root schema or one of the referenced schemas.

To learn more about tags, see Tagging Data and Schemas and Tag API Examples.

Metadata properties

A data contract can be enhanced with arbitrary metadata properties.

{
  "schema": "...",
  "metadata": {
    "properties": {
      "owner": "Bob Jones",
      "email": "bob@acme.com"
    }
  },
  "ruleSet": ...
}

Rules

Rules can be used to specify integrity constraints or data policies in a data contract. Rules have several properties.

  • name - a user-defined name that can be used to reference the rule
  • doc - an optional description
  • kind - either CONDITION or TRANSFORM
  • type - the type of rule, which invokes a specific rule executor, such as Google Common Expression Language (CEL) or JSONata.
  • mode - modes can be grouped as follows:
    • Migration rules can be specified for an UPGRADE, DOWNGRADE, or both (UPDOWN). Migration rules are used during complex schema evolution.
    • Domain rules can be specified during serialization (WRITE), deserialization (READ) or both (WRITEREAD). Domain rules can be used to transform the domain values in a message payload.
  • tags - the tags to which the rule applies, if any
  • params - a set of static parameters for the rule, which is optional. These are key-value pairs that are passed to the rule.
  • expr - the body of the rule, which is optional
  • onSuccess - an optional action to execute if the rule succeeds, otherwise the built-in action type NONE is used. For UPDOWN and WRITEREAD rules, one can specify two actions separated by commas, such as “NONE,ERROR” for a WRITEREAD rule. In this case NONE applies to WRITE and ERROR applies to READ.
  • onFailure - an optional action to execute if the rule fails, otherwise the built-in action type ERROR is used. For UPDOWN and WRITEREAD rules, one can specify two actions separated by commas, as mentioned above.

Every rule executor must implement a method named type() that returns a string. When a rule is specified in a schema, the client, during serialization or deserialization, checks to determine if an appropriate rule executor for the given rule type has been configured on the client. If so, the rule executor is used to run the rule.

Data quality rules

One of the built-in rule types is CEL, which supports data quality rules. The CEL rule type uses the Google Common Expression (CEL) language. To import the CEL rule executor, include the following dependency:

<dependency>
     <groupId>io.confluent</groupId>
     <artifactId>kafka-schema-rules</artifactId>
     <version>7.4.0</version>
</dependency>

Next, configure the Java client to use this executor:

The CEL rule executor will register itself using the “CEL” rule type. Next, you need to associate a CEL rule with the schema. For example, suppose you want to validate that the ssn field is only 9 characters long before serializing a message. You can use a schema with the rule below, where the message is passed to the rule as a variable with the name message:

{
  "schema": "...",
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsnLen",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "size(message.ssn) == 9"
        }
    ]
  }
}

Note that the example gives the rule the same name (checkSsnLen) as that passed to the rule.executors property. If you don’t use the same name, the client will use any executor that is registered for the given rule type on the client. However, it’s often useful to use the same name if you want to configure several rules of the same type (but with different names) differently.

During registration, if the schema is omitted above, then the ruleset will be attached to the latest schema in the subject. After registration, you need to specify use.latest.version=true on the client.

Now that you’ve set up a data quality rule, whenever a message is sent that has an ssn field that is not 9 characters in length, the serializer will throw an exception. Alternatively, if the validation fails, you can have the message sent to a dead-letter queue. Note that the action type “DLQ” is specified below. For the bidirectional rules (WRITEREAD, UPDOWN), you can specify a comma-separated pair of action types if needed.

{
  "schema": "...",
  "ruleSet": {
    "domainRules": [
        {
          "name": "checkSsnLen",
          "kind": "CONDITION",
          "type": "CEL",
          "mode": "WRITE",
          "expr": "size(message.ssn) == 9",
          "params": {
            "dlq.topic": "bad-data"
          },
          "onFailure": "DLQ"
        }
    ]
  }
}

Similar to rule executors, every rule action must implement a method named type() that returns a string.

When using a dead-letter queue to validate a record value, if the validation fails, you will usually want to capture both the record key and the record value. However, the record key is not passed to the serializer or deserializer that is validating the record value. To capture the record key, you must specify a “wrapping” serializer or deserializer for the record key that will capture the key and make it available to the serializer or deserializer for the record value. So if the record key is a simple string, instead of specifying:

key.serializer=org.apache.kafka.common.serialization.StringSerializer

Instead, specify:

key.serializer=io.confluent.kafka.serializers.WrapperKeySerializer
wrapped.key.serializer=org.apache.kafka.common.serialization.StringSerializer

To learn more about CEL, see https://github.com/google/cel-spec and Understanding CEL in Data Contract Rules.

Field-Level transforms

Field-level transforms can also be specified using the Google Common Expression Language (CEL). This is accomplished with a rule executor that transforms field values in a message, using a rule executor of type CEL_FIELD. To use this executor, include the following dependency:

dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-rules</artifactId>
    <version>7.4.0</version>
</dependency>

The CEL field level rule executor will register itself using the “CEL_FIELD” rule type. The rule expression can be applied to multiple fields, but those fields must be of the same type. To specify the applicable fields for the rule expression, we use a guard, which is a CEL boolean expression. So the complete rule expression is of the form:

<CEL expr for guard> ; <CEL expr for transformation>

Suppose you want to provide a default value for a field if it is empty. You can use a schema with the rule below, where field name is passed as name, the field value is passed as value, and the field type is passed as typeName. Note that the expr below is only applied to the field named occupation.

{
  "schema": "...",
  "ruleSet": {
    "domainRules": [
      {
        "name": "populateDefault",
        "kind": "TRANSFORM",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'occupation' ; value == '' ? 'unspecified' : value"
        }
    ]
  }
}

Alternatively, you can specify a guard of the form typeName == 'STRING' to apply the rule expression to all string fields, such as in the following:

{
  "schema": "...",
  "ruleSet": {
    "domainRules": [
      {
        "name": "populateDefault",
        "kind": "TRANSFORM",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "typeName == 'STRING' ; value == '' ? 'unspecified' : value"
        }
    ]
  }
}

Note

Only primitive types (string, bytes, int, long, float, double, boolean) are supported for field transformations. Enums are not supported.

Event-condition-action rules

When using conditions, one can model arbitrary Event-Condition-Action (ECA) rules by specifying an action for onSuccess to use when the condition is true, and an action for onFailure to use when the condition is false. The action type for onSuccess defaults to the built-in action type NONE, and the action type for onFailure defaults to the built-in action type ERROR. You can explicitly set these values to NONE, ERROR, DLQ, or a custom action type. For example, you might want to implement an action to send an email whenever an event indicates a credit card is about to expire.

To implement a custom action type, implement a RuleAction interface and then register the action as follows:

rule.actions=myRuleName  # a comma-separated list of names
rule.actions.myRuleName.class=com.myorg.MyCustomAction
rule.actions.myRuleName.param.myparam1=value1
rule.actions.myRuleName.param.myparam2=value2

The RuleAction interface looks as follows:

public interface RuleAction extends RuleBase {
  String type();

  void run(RuleContext ctx, Object message, RuleException ex)
    throws RuleException;
}

Complex schema evolution

Schema Registry is used to serve and maintain a versioned history of schemas. It also supports schema evolution, by only allowing schemas to evolve according to the configured compatibility settings. A versioned schema history is called a subject, and the compatibility rules enforce a subject-schema constraint, only allowing the schema in a subject to evolve in a manner that does not break clients.

But what if you want to evolve the schema in an incompatible manner without breaking clients? Such a scenario requires some form of data migration, and there are two possible solutions for a migration:

  • Inter-topic migrations: Set up a new Kafka topic and translate the topic data from the old format in the old topic to the new format in the new topic.
  • Intra-topic migrations: Use transformations when consuming from a topic to translate the topic data from the old format to the new format.
../../_images/data-contracts-transforms-for-schema-evolution.png

The complexity with the first approach is in switching consumers from reading the old topic to the new topic in a way that does not skip data or cause duplicate processing. To do this properly, a correlation would need to be established between the offsets of the two topics, and the consumer would need to start reading from the correct correlation point in the new topic. In addition, if something goes wrong you might want the ability to switch back to the old topic, again using the correct correlation point.

With the second option, the client does not need to be switched over to a different topic, but instead continues to read from the same topic, and a set of declarative migration rules will massage the data into the form that the consumer expects. With such declarative migration rules, we can support a system in which producers are separately using versions 1, 2, and 3 of a schema, which are all incompatible, and consumers that expect versions 1, 2, or 3 each see a message transformed to the desired version, regardless of which version the producer sent. This is often a cleaner and simpler solution for complex schema evolution that was not available with Schema Registry until now, with the release of Confluent Platform 7.4.

Before discussing how to use declarative migration rules, the next sections cover a few features that were added to Schema Registry in order to support complex schema evolution.

Application major versioning

The current versioning scheme in Schema Registry is a monotonically increasing sequence of numbers. If you want to identify when a breaking or incompatible change occurs in the schema history, you can use the recently added metadata property of the schema object. The name “application.major.version” below is arbitrary, you can call it just “major.version” for example.

{
  "schema": "...",
  "metadata": {
    "properties": {
      "application.major.version": "2"
    }
  },
  "ruleSet": ...
}

You can then specify that a consumer only use the latest schema of a specific major version.

use.latest.with.metadata=application.major.version=2
latest.cache.ttl.sec=300

The above example also specifies that the client should check for a new latest version every 5 minutes. This TTL configuration can also be used with the use.latest.version=true configuration.

Configuration enhancements

The compatibility level in Schema Registry can be set at the global level, or at the level of an individual subject. This setting is achieved by using a configuration object. Previously, the only field in the configuration object was for the compatibility level. The configuration object has been enhanced with the following properties;

  • compatibilityGroup - Only schemas that belong to the same compatibility group will be checked for compatibility.
  • defaultMetadata - Default value for the metadata to be used during schema registration.
  • overrideMetadata - Override value for the metadata to be used during schema registration.
  • defaultRuleSet - Default value for the ruleSet to be used during schema registration.
  • overrideRuleSet - Override value for the ruleSet to be used during schema registration.

For example, when registering a schema, schema metadata is initialized by:

  1. first, taking the default value
  2. then, merging the metadata from the schema on top of it
  3. finally, merging the override value for the final result

If the schema does not specify metadata, then during the second step, the schema will use the metadata from the previous version if it exists.

By setting the “overrideMetadata” with the following value, we will ensure that every schema has an “application.major.version” of “2” when it is registered. This allows us to not have to explicitly pass this metadata value with every schema during registration. Also, by specifying below that the “compatibilityGroup” is “application.major.version”, only schemas that have the same value for “application.major.version” will be compared against one another during compatibility checks.

{
  "compatibilityGroup": "application.major.version",
  "overrideMetadata": {
    "properties": {
      "application.major.version": "2"
    }
  }
}

Migration rules

With the above configurations, you can now pin a consumer to a specific application major version, and ensure that compatibility checks only happen as long as the application major version does not change.

When you introduce a schema that belongs to a new major version, you must attach migration rules to the schema to indicate how to transform the message from the previous major version to the new major version (for new consumers reading old messages) and how to transform the message from the new major version to the previous major version (for old consumers reading new messages). If a consumer encounters a schema with a major version that is two or more versions away, such as going from major version 1 to 3 (or from 3 to 1), then it will run migration rules transitively, in this case from 1 to 2, and then from 2 to 3.

../../_images/data-contracts-migration-rules.png

In the diagram above, the producer executes the rules applicable to WRITE mode, while the consumer either runs rules in either UPGRADE or DOWNGRADE mode (depending on the schema version of the payload and the desired version), then executes rules applicable to READ mode. Note that the rules are executed in the reverse order during READ mode. This is to ensure any actions performed during WRITE are undone in the proper order during READ.

How do migration rules work? They take advantage of the fact that Avro, JSON Schema, and Protobuf all specify a JSON serialization format for messages. The migration rules can be specified in any declarative language that supports JSON transformations.

Declarative JSONata rules

JSONata is a sophisticated transformation language for JSON that was inspired by XPath. To use the rule executor for JSONata, include the following dependency:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-rules</artifactId>
    <version>7.4.0</version>
</dependency>

The JSONata rule executor will register itself using the “JSONATA” rule type.

Suppose you have a simple schema with an ssn field.

{
  "type":"record",
  "name":"MyRecord",
  "fields":[{
    "name":"ssn",
    "type":"string",
    "confluent:tags": [ "PII", "PRIVATE" ]
  }]
}

We want to rename the field as “socialSecurityNumber”. Below are a set of migration rules to achieve this. In this case, you can use the JSONata function called sift to remove the field with the old name, and then use a JSON property to specify the new field.

{
  "schema": "...",
  "ruleSet": {
    "migrationRules": [
      {
        "name": "changeSsnToSocialSecurityNumber",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "UPGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'ssn'}), {'socialSecurityNumber': $.'ssn'}])"
      },
        {
        "name": "changeSocialSecurityNumberToSsn",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "DOWNGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'socialSecurityNumber'}), {'ssn': $.'socialSecurityNumber'}])"
        }
    ]
  }
}

The UPGRADE rule allows new consumers to read old messages, while the DOWNGRADE rule allows old consumers to read new messages. If you plan to upgrade all consumers, you can of course omit the DOWNGRADE rule.

To learn more about JSONata, see https://jsonata.org and Understanding JSONata.

Custom rules

As mentioned previously, a custom rule can be used by implementing the RuleExecutor interface:

public interface RuleExecutor extends RuleBase {
  String type();

  Object transform(RuleContext ctx, Object message) throws RuleException;
}

For rules of type CONDITION, the transform method should return a Boolean value, and for rules of type TRANSFORM, the transform method should return the transformed message.

Quick start

This Quick Start shows an example of a data quality rule using an Avro schema. You’ll creates a schema with a single field “f1”, and add a data quality rule rule to ensure that the length of the field is always less than 10.

Requirements

Schema rules are only available on Confluent Enterprise (not the Community edition) and Confluent Cloud with the Stream Governance “Advanced” package.

Start the producer

To run the producer on Confluent Platform:

 ./bin/kafka-avro-console-producer \
   --topic test \
   --bootstrap-server localhost:9092 \
   --property schema.registry.url=http://localhost:8081 \
   --property value.schema='{"type":"record","name":"myrecord","fields":
   [{"name":"f1","type":"string"}]}' \
   --property value.rule.set='{ "domainRules":
   [{ "name": "checkLen", "kind": "CONDITION", "type": "CEL",
      "mode": "WRITE", "expr": "size(message.f1) < 10",
      "onFailure": "ERROR"}]}'

{"f1": "success"}
{"f1": "this will fail"}

Start the consumer

To run the consumer on Confluent Platform:

./bin/kafka-avro-console-consumer \
  --topic test \
  --bootstrap-server localhost:9092 \
  --property schema.registry.url=http://localhost:8081

Reference

CEL rule executor

The CEL rule executor sets the following variables that are available in expressions.

  • message - The message

CEL_FIELD rule executor

The CEL_FIELD rule executor sets the following variables that are available in expressions.

  • value - Field value
  • fullName - Fully-qualified name of the field
  • name - Field name
  • typeName - Name of the field type, one of STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN
  • tags - Tags that apply to the field
  • message - The containing message

JSONATA rule executor

The JSONATA rule executor supports the following configuration parameters that can be passed when specifying the executor.

  • timeout.ms - Maximum execution time in milliseconds
  • max.depth - Maximum number of recursive calls