.. _schema_validation: Validate Schemas Broker-side in |cp| ==================================== |sv| enables the broker to verify that data produced to a |ak| topic is using a valid schema ID in |sr| that is registered according to the :ref:`subject naming strategy `. Schema Validation does not perform data introspection, but rather checks that the schema ID in the Wire Format is registered in |sr| under a valid subject. You must use a serializer and deserializer (serdes) that respect the :ref:`messages-wire-format`, or use a Confluent supported serde, as described in `Formats, Serializers, and Deserializers `__. .. tip:: |sv| is also available on |ccloud| clusters using the hosted, per environment |sr|. To learn more, see :cloud:`Using Broker-Side Schema Validation on Confluent Cloud|sr/broker-side-schema-validation.html`. Limitations ----------- - Schema validation is supported only in the default context. Otherwise, you run the risk of subject name collisions for some cases. To learn more about schema contexts, see :ref:`schema-contexts-cp` and :ref:`schema-linking-cp-overview`. .. _sv-set-sr-url-on-brokers: Prerequisites and Setting |sr| URLs on the Brokers -------------------------------------------------- Basic requirements to run these examples are generally the same as those described for the :ref:`Schema Registry Tutorial ` with the exception of Maven, which is not needed here. Also, |cp| version 5.4.0 or later is required here. As an additional prerequisite to enable |sv| on the brokers, you must specify ``confluent.schema.registry.url`` in the |ak| ``server.properties`` file (``$CONFLUENT_HOME/etc/kafka/server.properties``) before you start |cp|. This tells the broker how to connect to |sr|. For example: :: confluent.schema.registry.url=http://schema-registry:8081 This configuration accepts a comma-separated list of URLs for |sr| instances. This setting is required to make |sv| available both from the :ccloud-cli:`Confluent CLI|command-reference/index.html` and on the :ref:`control_center`. Enabling |sv| on a topic from the |confluent-cli| ------------------------------------------------- You can enable |sv| on a topic when you create a topic or modify an existing topic. ------------------------ Create a Topic with |sv| ------------------------ To set |sv| on a topic when you create the topic, set ``confluent.value.schema.validation=true`` and ``confluent.key.schema.validation=true``. Value schema and key schema validation are independent of each other; you can enable either or both. (By default, schema validation is not enabled; both key and value schema validation default to ``false``.) For example, this command creates a topic called ``my-topic-sv`` with schema validation enabled on the value schema: :: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \ --partitions 1 --topic my-topic-sv \ --config confluent.value.schema.validation=true The output of this command is: :: Created topic my-topic-sv. With this configuration, if a message is produced to the topic ``my-topic-sv`` that does not have a valid schema for the value of the message, an error is returned to the producer, and the message is discarded. If a batch of messages is sent, and at least one is invalid, then the entire batch is discarded. .. tip:: - To learn more about value and key schemas, see :ref:`schema_registry_terminology` in the |sr| tutorial. - To learn how to create a schema for a topic, see :ref:`topicschema`. ----------------------------- Add |sv| to an Existing Topic ----------------------------- Create a new topic called ``my-first-topic``: :: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-first-topic The output of this command is: :: Created topic my-first-topic. To change topic validation configuration on an existing topic (in this case from ``false`` to ``true``), specify validation using the ``alter`` and ``--add-config`` flags as shown in this example: :: kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-first-topic --add-config confluent.value.schema.validation=true You should get this confirmation: :: Completed updating config for topic 'my-first-topic'. To disable topic validation on this topic (from ``true`` to ``false``), rerun the above command with ``--add-config confluent.value.schema.validation=false``. .. _sr-per-topic-subject-name-strategy: ---------------------------------------------- Change the subject naming strategy for a topic ---------------------------------------------- By default, |cs| uses the ``TopicNameStrategy`` as the :ref:`naming strategy ` to map topics with schemas in |sr|. Before |cp| 5.5.0, the subject name strategy was configured on the brokers in ``server.properties``, which required that you use the same strategy for all topics on a broker. Starting with |cp| 5.5.0, the naming strategy is associated with the topics. Therefore, you now have the option to configure a naming strategy to something other than the default `on a per-topic basis` for both the schema subject key and value with ``confluent.key.subject.name.strategy`` and ``confluent.value.subject.name.strategy``. From the |confluent-cli|, use the ``--config`` option to create or modify a topic with the specified naming strategy. For example: To create a topic that uses ``RecordNameStrategy`` for the value: .. code:: bash ./bin/kafka-topics --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 1 --topic my-other-cool-topic \ --config confluent.value.schema.validation=true --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy To modify a topic to use ``RecordNameStrategy`` as the key: .. code:: bash kafka-configs --bootstrap-server localhost:9092 \ --alter --entity-type topics --entity-name my-other-cool-topic \ --add-config confluent.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy The possible configurations for both ``confluent.value.subject.name.strategy`` and ``confluent.key.subject.name.strategy`` are: - ``io.confluent.kafka.serializers.subject.TopicNameStrategy`` (the default) - ``io.confluent.kafka.serializers.subject.RecordNameStrategy`` - ``io.confluent.kafka.serializers.subject.TopicRecordNameStrategy`` .. seealso:: :ref:`sr-subjects-topics-primer`. Enabling |sv| on a topic in |c3| -------------------------------- You can view, enable, or disable |sv| on a topic in |c3-short| (see :ref:`topicschema` in |c3-short| guide). Additionally, you can configure per-topic subject naming strategies. ---------------------------------------- View or Change |sv| settings for a topic ---------------------------------------- To view current configurations or enable |sv| on a topic from the |c3-short| (`http://localhost:9021/ `_): #. Click the **Configuration** tab on an existing topic, and click **Edit settings**. #. Click **Switch to expert mode**. .. figure:: ../images/sv-c3-topic-expert-settings.png :align: center :scale: 85% #. In Expert mode, change the settings for ``confluent.value.schema.validation`` and ``confluent.key.schema.validation`` from false to **true**. You may need to scroll down to find ``confluent.key.schema.validation``. .. figure:: ../images/sv-c3-topic-enable.png :align: center :scale: 75% #. Click **Save changes**. ---------------------------------------------- Change the subject naming strategy for a topic ---------------------------------------------- To change the :ref:`subject naming strategy ` using |c3-short|: #. On |c3-short|, select the topic to update, click **Configuration**, then click **Switch to expert mode**. Search for ``confluent.value.subject.name.strategy`` and ``confluent.key.subject.name.strategy``. .. figure:: ../images/c3-schema-subject-name-strategy.png :align: center #. Modify the settings and click **Save changes**. The possible configurations for both ``confluent.value.subject.name.strategy`` and ``confluent.key.subject.name.strategy`` are: - ``io.confluent.kafka.serializers.subject.TopicNameStrategy`` (the default) - ``io.confluent.kafka.serializers.subject.RecordNameStrategy`` - ``io.confluent.kafka.serializers.subject.TopicRecordNameStrategy`` To learn more, see :ref:`sr-per-topic-subject-name-strategy`, which describes how to perform the same tasks on the |confluent-cli| and gives more background information on the feature. Demo: Enabling |sv| on a Topic at the Command Line -------------------------------------------------- This short demo shows the effect of enabling or disabling schema validation on a topic. If you are just getting started with |cp| and |sr|, you might want to first work through the :ref:`schema_registry_onprem_tutorial`, then return to this demo. The examples make use of the ``kafka-console-producer`` and ``kafka-console-consumer``, which are located in ``$CONFLUENT_HOME/bin``. #. On a local install of |cp| version 5.4.0 or later, modify ``$CONFLUENT_HOME/etc/kafka/server.properties`` to include the following configuration for the |sr| URL: .. codewithvars:: bash ############################## My Schema Validation Demo Settings ################ # Schema Registry URL confluent.schema.registry.url=http://localhost:8081 The example above includes two lines of comments, which are optional, to keep track of the configurations in the file. #. Start |cp| using the following command: .. codewithvars:: bash |confluent_start| .. include:: includes/sr-start-confluent-tips.rst #. Create a test topic called ``test-schemas`` without specifying the |sv| setting so that it defaults to ``false``. .. code:: bash kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test-schemas This creates a topic with no broker validation on records produced to the test topic, which is what you want for the first part of the demo. You can verify that the topic was created with ``kafka-topics --bootstrap-server localhost:9092 --list``. #. In a new command window for the producer, run this command to produce a serialized record (using the default string serializer) to the topic ``test-schemas``. .. code:: bash kafka-console-producer --broker-list localhost:9092 --topic test-schemas --property parse.key=true --property key.separator=, The command is successful because you currently have |sv| disabled for this topic. If broker |sv| had been enabled for this topic, the above command to produce to it would not be permitted. The output of this command is a producer command prompt (``>``), where you can type the messages you want to produce. Type your first message at the ``>`` prompt as follows: .. code:: bash 1,my first record Keep this session of the producer running. #. Open a new command window for the consumer, and enter this command to read the messages: .. code:: bash kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test-schemas --property print.key=true The output of this command is ``my first record``. Keep this session of the consumer running. #. Now, set |sv| for the topic ``test-schemas`` to ``true``. .. code:: bash kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name test-schemas --add-config confluent.value.schema.validation=true You should get a confirmation: ``Completed updating config for topic test-schemas.`` #. Return to the producer session, and type a second message at the ``>`` prompt. .. code:: bash 2,my second record You will get an error because |sv| is enabled and the messages we are sending do not contain schema IDs: ``This record has failed the validation on broker`` If you subsequently disable |sv| (use the same command to set it to ``false``), restart the producer, then type and resend the same or another similarly formatted message, the message will go through. (For example, produce ``3,my third record``.) .. tip:: As an alternative to restarting the producer in this last step (after switching schema validation back to ``false``), you `can` simply type or copy-paste the suggested message ``3,my third record`` on the blank line after the error and hit return. The consumer will pick it up, and you will then get the producer prompt back. However, this is an unintuitive workflow because there is no prompt showing after the error, and you may be tempted to hit return first, which will shut down the producer. The messages that were successfully produced also show on |c3-short| (`http://localhost:9021/ `_ in your web browser) in **Topics > test-schemas > messages**. You may have to select a partition or jump to a timestamp to see messages sent earlier. .. figure:: ../images/sv-topics.png :align: center #. .. include:: includes/cp-local-examples-cleanup.rst Configuring the range for valid schema IDs ------------------------------------------ Schema Validation tracks the maximum schema ID seen in |sr|. If a schema ID is seen that is too far above the maximum schema ID, it is automatically considered invalid. Since the maximum schema ID may be slightly out of date, you can configure an additional range above the maximum schema ID to allow Schema Validation to verify whether the schema ID exists in |sr|. The property ``confluent.missing.id.query.range`` indicates the range above the maximum schema ID beyond which schemas are automatically considered invalid. The default value for ``confluent.missing.id.query.range`` is ``200``. With the default setting, if the schema ID is greater than the maximum schema ID plus 200, it is automatically considered invalid and an alert is triggered. .. _sv-broker-sr-security-settings: Configuring Security for |sv| ----------------------------- In general, |sr| initiates the connection to the brokers. |sv| is unique in that the broker(s) initiate the connection to |sr|. They do so in order to retrieve schemas from the registry, and verify that the messages they receive from producers match schemas associated with particular topics. With |sv| enabled, the sequence of tasks looks something like this: #. A broker receives a message from a producer, and sees that it's directed to a topic that has a schema associated. #. The broker initiates a connection to |sr|. #. The broker asks for the schema associated with the topic (by schema ID). #. |sr| receives the request, finds the requested schema in its schema storage, and returns it to the broker. #. The broker validates the schema ID. Therefore, to set up security on a cluster that has broker-side |sv| enabled on topics, you must configure settings on the |ak| broker to support this broker-initiated connection to |sr|. For multiple brokers, each broker must be configured. For example, for mTLS, ideally you would have a different certificate for each broker. Note that Schema Registry's internal |ak| client to |ak| brokers is not relevant at all to the connection between broker-side |sv| and Schema Registry's HTTP listeners. The security settings below do not reflect anything about the |sr| internal client-to-broker connection. The broker configurations below include ``confluent.schema.registry.url``, which tells the broker how to connect to |sr|. You may already have configured this on your brokers, as a :ref:`prerequisite for using Schema Validation `. The rest of the settings shown are specific to security configurations. .. tip:: The sections below focus on security configurations on the brokers. For information about setting up security on |sr|, see :ref:`schemaregistry_security` and related sections. For example, to set up |sr| to require clients (like brokers) to authenticate with a username and password, see :ref:`basic-auth-sr` under :ref:`http-basic-auth`. -------------------------------- mTLS (mutual TLS) Authentication -------------------------------- To configure :ref:`mutual TLS (mTLS) authentication ` (also known as :ref:`mtls-authentication-option`), the broker will authenticate to |sr| using TLS/SSL for authentication (certificates) and :ref:`encryption `. Define the following settings in the broker properties file for each broker (``$CONFLUENT_HOME/etc/kafka/server.properties``). .. codewithvars:: bash confluent.schema.registry.url=https://: confluent.ssl.truststore.location= confluent.ssl.truststore.password= confluent.ssl.keystore.location= confluent.ssl.keystore.password= confluent.ssl.key.password= -------------------- Basic Authentication -------------------- For this setup, the brokers are configured to authenticate to |sr| using :ref:`basic authentication `. Define the following settings on each broker (``$CONFLUENT_HOME/etc/kafka/server.properties``). .. codewithvars:: bash confluent.schema.registry.url=http://: confluent.basic.auth.credentials.source= confluent.basic.auth.user.info=: #required only if credentials source is set to USER_INFO - The property ``confluent.basic.auth.credentials.source`` defines the type of credentials to use (user name and passsword). These are literals, not variables. - If you set ``confluent.basic.auth.credentials`` to ``USER_INFO``, you must also specify ``confluent.basic.auth.user.info``. ----------------------------- Basic Authentication with SSL ----------------------------- Define the following settings on each broker (``$CONFLUENT_HOME/etc/kafka/server.properties``). .. codewithvars:: bash confluent.schema.registry.url=https://: confluent.basic.auth.credentials.source=USER_INFO confluent.basic.auth.user.info=: confluent.ssl.truststore.location= confluent.ssl.truststore.password= ----------------------------------------- Role-based Access Control (RBAC) and ACLs ----------------------------------------- You can configure :ref:`Role-Based Access Control ` (and/or :ref:`ACLs `) for authorization to |sr|, and use it with other types of security on the brokers. The :ref:`confluentsecurityplugins_schema_registry_security_plugin` supports authorization for both |rbac| and ACLs. Configure the Brokers ^^^^^^^^^^^^^^^^^^^^^ Define the appropriate settings on the brokers similar to those shown in previous sections, depending on the type of security. For example, for basic authentication, the broker properties files would contain: .. codewithvars:: bash confluent.schema.registry.url=http://: confluent.basic.auth.credentials.source=USER_INFO confluent.basic.auth.user.info=: #required only if credentials source is set to USER_INFO Set RBAC Role Bindings to Allow Access to |sr| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ At a minimum, in terms of defining RBAC role bindings, |sr| needs the following IAM assignment: .. codewithvars:: bash confluent iam rbac role-binding list --principal User: \ --role DeveloperRead --kafka-cluster-id \ --resource Subject:* --schema-registry-cluster-id The |sr| cluster ID is the same as ``schema-registry-group-id``, which defaults to **schema-registry**. License ------- A |cpe| license is required for broker-side |sv|. To learn more, see :ref:`License ` on the main page of this |sr| documentation. Related Content --------------- - Blog post: `Schema Validation with Confluent Platform 5.4 `__ - :ref:`replicator-and-schema-validation` explains how |sv| interacts with |crep| configurations used for replicating data from one cluster to another