Use Broker-Side Schema Validation on Confluent Cloud

Schema Validation enables the broker to verify that data produced to a Kafka topic uses a valid schema ID in Schema Registry that is registered according to the subject naming strategy. (See also, Schemas, Subjects, and Topics.)

Schema Validation does not perform data introspection, but rather checks that the embedded magic byte in the Wire Format is registered in Schema Registry under a valid subject.

You must use a serializer and deserializer (serdes) that respect the Wire Format, or use a Confluent supported serde, as described in Formats, Serializers, and Deserializers.

Prerequisites

  • Schema Validation on Confluent Cloud is only available on dedicated clusters through the hosted Schema Registry. Confluent Cloud brokers cannot use self-managed instances of Schema Registry, only the Confluent Cloud hosted Schema Registry. (Schema validation is available for on-premises deployments through Confluent Enterprise).
  • You must have a Schema Registry enabled for the environment in which you are using Schema Validation.
  • Schema Validation is bounded at the level of an environment. All dedicated clusters in the same environment share a Schema Registry. Clusters do not have visibility into schemas across different environments.

Schema Validation Configuration options on a topic

Schema Validation is set at the topic level with the following parameters.

Property Description
confluent.key.schema.validation When set to true, enables schema ID validation on the message key. The default is false.
confluent.value.schema.validation When set to true, enables schema ID validation on the message value. The default is false.
confluent.key.subject.name.strategy Set the subject name strategy for the message key. The default is io.confluent.kafka.serializers.subject.TopicNameStrategy.
confluent.value.subject.name.strategy Set the subject name strategy for the message value. The default is io.confluent.kafka.serializers.subject.TopicNameStrategy.

Tip

  • Value schema and key schema validation are independent of each other; you can enable either or both.
  • The subject naming strategy is tied to Schema Validation. This will have no effect when Schema Validation is not enabled.

Get the latest version of the Confluent CLI

Enable Schema Validation from the Confluent CLI

You can enable Schema Validation on a topic when you create a topic or modify an existing topic.

The command syntax to enable Schema Validation is as follows:

confluent kafka topic <create|update> <topic-name> --config confluent.<key|value>.schema.validation=true

For example, this command creates a topic called flights with schema validation enabled on the value schema:

confluent kafka topic create flights --config confluent.value.schema.validation=true

With this configuration, if a message is produced to the topic flights that does not have a valid schema for the value of the message, an error is returned to the producer, and the message is discarded.

If a batch of messages is sent and at least one is invalid, then the entire batch is discarded.

If you do not specify a different subject naming strategy, io.confluent.kafka.serializers.subject.TopicNameStrategy is used by default. You can modify the naming strategies used for either or both the message key and message value schemas. For example, the following command sets the subject naming strategy on the topic flights to use io.confluent.kafka.serializers.subject.RecordNameStrategy.

confluent kafka topic update flights --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

The following naming strategies are available as accepted values for confluent.value.subject.name.strategy.

Strategy Description
TopicNameStrategy Derives subject name from topic name. (This is the default.)
RecordNameStrategy Derives subject name from record name, and provides a way to group logically related events that may have different data structures under a subject.
TopicRecordNameStrategy Derives the subject name from topic and record name, as a way to group logically related events that may have different data structures under a subject.

Note

The full class names for the above strategies consist of the strategy name prefixed by io.confluent.kafka.serializers.subject., as shown in the examples in this section.

Enable Schema Validation on a topic from the Confluent Cloud Console

To set Schema Validation on a topic from the Cloud Console:

  1. Navigate to a topic.

  2. Click the Configuration tab.

  3. Click Edit Settings.

  4. Click Switch to expert mode.

  5. In Expert mode, change the settings for confluent.value.schema.validation and/or confluent.key.schema.validation from false to true.

    ../_images/cloud-sv-topic-enable.png

    Tip

    If you do not specify a different naming strategy, TopicNameStrategy is used by default.

    You can modify the naming strategies used for either or both the message key and message value schemas. These settings are also available in Expert mode on the selected topic. Set these now, if desired.

  6. Click Save changes.

To disable Schema Validation, set these same configuration options to false.

Schema Validation demo

You can test Schema Validation by following along with this short demo.

  1. Create a test topic called players-maple either from the web UI or the Confluent CLI. Do not specify the Schema Validation setting, so that your topic defaults to false.

    Here is the command to use from the Confluent CLI:

    confluent kafka topic create players-maple
    

    This creates a topic with no broker validation on records produced to the test topic, which is what you want for the first part of the demo.

  2. In a new command window for the producer (logged into Confluent Cloud and on the same environment and cluster), run this command to produce a serialized record (using the default string serializer) to the topic players-maple.

    confluent kafka topic produce players-maple --parse-key=true --delimiter=,
    

    The command is successful because you currently have Schema Validation disabled for this topic. If broker Schema Validation had been enabled for this topic, the above command to produce to it would not be permitted.

    Type your first message at the producer prompt as follows:

    1,Pierre
    

    Keep this session of the producer running.

  3. Open a new command window for the consumer (logged into Confluent Cloud and on the same environment and cluster), and enter this command to read the messages:

    confluent kafka topic consume players-maple --from-beginning --print-key=true
    

    The output of this command is 1    Pierre.

    Keep this session of the consumer running.

  4. Now, set Schema Validation for the topic players-maple to true.

    confluent kafka topic update players-maple --config confluent.value.schema.validation=true
    

    Tip

    You can also update this setting on the Confluent Cloud Console in expert mode for the configuration on the players-maple topic.

  5. Return to the producer session, and type a second message at the prompt.

    2,Frederik
    

    You will get an error because Schema Validation is enabled and the messages we are sending do not contain schema IDs: Error: producer has detected an INVALID_RECORD error for topic players-maple

    If you subsequently disable Schema Validation (use the same command to set it to false), then type and resend the same or another similarly formatted message, the message will go through. (For example, produce 3,Ben.)

    The messages that were successfully produced also show in your web browser in Topics > players-maple > Messages. You may have to select a partition or jump to a timestamp to see messages sent earlier.

What Schema Validation checks and how it works

When Schema Validation is enabled on a topic, it checks for the following on each message:

  • The message produced to the topic has an associated schema. (The message must have an associated schema ID, which indicates it has a schema.)
  • The schema must match the topic.

The demo above is a straight-forward way to demonstrate that Schema Validation is working, using the Confluent CLI.

In practice, you would typically send an Avro object, Protobuf object, or Jackson-serializable POJO as a function of a client application. In this case, Schema Validation derives the schema based on the object. The schema is sent to Schema Registry, which checks to see if the schema exists in the subject. If it does, Schema Registry uses the schema ID of that version. If it doesn’t, Schema Registry throws an error if the client has auto schema registration set to false, or will register the schema if the client has auto schema registration set to true.

Auto schema registration is set in the client application. By default, client applications automatically register new schemas. You can disable auto schema registration on your clients, which is typically recommended in production environments. To learn more, see Disabling Auto Schema Registration in the Confluent Platform documentation.

Troubleshooting

If the tutorial and examples above do not work or you do not see or have access to confluent.value.schema.validation, check for the following:

  • Schema validation is only available on dedicated clusters through the hosted Schema Registry. Make sure that the cluster you are using is a Dedicated Kafka cluster (not Basic, Standard, or Enterprise).
  • Schema validation requires that Schema Registry is enabled on the environment. Make sure you have Schema Registry enabled for the environment in which you are using Schema Validation.