.. _schema_registry_onprem_tutorial: Tutorial: Use |sr| on |cp| to Implement Schemas for a Client Application ======================================================================== .. _schema_registry_tutorial: This tutorial provides a step-by-step workflow for using |sr-long| on |cp|. You will learn how to enable client applications to read and write Avro data, check compatibility as schemas evolve, and use |c3|, which has integrated capabilities with |sr|. .. include:: ../includes/cp-demo-tip.rst .. _schema_registry_terminology: Terminology ~~~~~~~~~~~ First let us levelset on terminology, and answer the question: What is a **topic** versus a **schema** versus a **subject**? Topic ^^^^^ A |ak| `topic` contains messages, and each message is a key-value pair. The key acts as an identifier (for example a message key may be a two-digit number), while the message contains actionable data (such as a customer name or address). Message keys are not required and not always used. Message Keys and Values ^^^^^^^^^^^^^^^^^^^^^^^ Either the message key or the message value, or both, can be serialized as Avro, JSON, or Protobuf. For example, the message value may be using an Avro ``record``, while the key may be a primitive (``string``, ``integer``, and so forth). Typically message keys, if used, are primitives, but they can be complex data types as well (for example, ``record`` or ``array``). How you set the key is up to you and the requirements of your implementation. For detailed examples of key and value schemas, see the discussion under `Formats, Serializers, and Deserializers `__ in the |sr| documentation. Schemas and Subjects ^^^^^^^^^^^^^^^^^^^^ A `schema` defines the structure of the data format. The |ak| topic name can be independent of the schema name. |sr| defines a scope in which schemas can evolve, and that scope is the `subject`. The name of the subject depends on the configured :ref:`subject name strategy `, which by default is set to derive subject name from topic name. You can modify the subject name strategy on a per-topic basis. See :ref:`sr-per-topic-subject-name-strategy` to learn more. Example Use Case ^^^^^^^^^^^^^^^^ As a practical example, let's say a retail business is streaming transactions in a |ak| topic called ``transactions``. A producer is writing data with a schema ``Payment`` to that |ak| topic ``transactions``. If the producer is serializing the message value as Avro, then |sr| has a subject called ``transactions-value``. If the producer is also serializing the message key as Avro, |sr| would have a subject called ``transactions-key``, but for simplicity, in this tutorial consider only the message value. That |sr| subject ``transactions-value`` has at least one schema called ``Payment``. The subject ``transactions-value`` defines the scope in which schemas for that subject can evolve and |sr| does compatibility checking within this scope. In this scenario, if developers evolve the schema ``Payment`` and produce new messages to the topic ``transactions``, |sr| checks that those newly evolved schemas are compatible with older schemas in the subject ``transactions-value`` and adds those new schemas to the subject. Setup ~~~~~ .. _sr-tutorial-prereqs: Prerequisites ^^^^^^^^^^^^^ Verify that you have installed the following on your local machine: * :cp-download:`Confluent Platform|` * :confluent-cli:`Confluent CLI|installing.html` * Java 8 or 11 to run |cp| (for full information about |cp| Java prerequisites, see :ref:`Java ` under Supported Versions and Interoperability) * Maven to compile the client Java code * ``jq`` tool to nicely format the results from querying the |sr| REST endpoint Environment Setup ^^^^^^^^^^^^^^^^^ #. Use the :ref:`quickstart` to bring up a single-node |cp| development environment. With a single-line :confluent-cli:`confluent local|command-reference/local/index.html` command, you can have a basic |ak| cluster with |sr|, |c3-short|, and other services running on your local machine. .. codewithvars:: bash |confluent_start| Your output should resemeble: .. code:: bash Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Starting ksql-server ksql-server is [UP] Starting control-center control-center is [UP] #. .. include:: includes/sr_environment_setup.rst #. Create a local configuration file with all the |ak| and |sr| connection information that is running on your local machine, and save it to ``$HOME/.confluent/java.config``, where `$HOME `__ represents your user home directory. It should resemble below: .. code:: # Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for higher availability in Apache Kafka clients prior to 3.0 session.timeout.ms=45000 # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }} Create the transactions topic ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For the exercises in this tutorial, you will be producing to and consuming from a topic called ``transactions``. Create this topic in |c3-short|. #. Navigate to the |c3-short| web interface at `http://localhost:9021/ `_. .. important:: It may take a minute or two for |c3-short| to come online. .. image:: ../images/c3-landing-page.png :width: 600px #. Click into the cluster, select **Topics** and click **Add a topic**. .. image:: ../images/c3-create-topic-sr.png :width: 600px #. Name the topic ``transactions`` and click **Create with defaults**. .. image:: ../images/c3-create-topic-name-sr.png :width: 600px The new topic is displayed. .. image:: ../images/c3-create-topic-new-sr.png :width: 600px .. _schema_registry_tutorial_definition: Schema Definition ~~~~~~~~~~~~~~~~~ .. include:: includes/sr_schema_definition.rst .. _sr-tutorial-clients-avro-maven: Client Applications Writing Avro ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Maven ^^^^^ .. include:: includes/sr_maven.rst Configuring Avro ^^^^^^^^^^^^^^^^ .. include:: includes/sr_configuring_avro.rst .. _sr-tutorial-java-producers: Java Producers ^^^^^^^^^^^^^^ .. include:: includes/sr_java_producers.rst --------------------- Example Producer Code --------------------- .. include:: includes/sr_example_producer.rst ---------------- Run the Producer ---------------- Run the following commands in a shell from ``examples/clients/avro``. #. To run this producer, first compile the project: .. code:: bash mvn clean compile package #. From the |c3-short| navigation menu at `http://localhost:9021/ `_, make sure the cluster is selected, and click **Topics**. Next, click the ``transactions`` topic and go to the **Messages** tab. You should see no messages because no messages have been produced to this topic yet. #. .. include:: includes/sr_run_producer.rst #. Now you should be able to see messages in |c3-short| by inspecting the ``transactions`` topic as it dynamically deserializes the newly arriving data that was serialized as Avro. At `http://localhost:9021/ `_, click into the cluster on the left, then go to **Topics** -> ``transactions`` -> **Messages**. .. tip:: If you do not see any data, rerun the Producer and verify it completed successfully, and look at the |c3-short| again. The messages do not persist in the Console, so you need to view them soon after you run the producer. .. figure:: ../images/c3-inspect-transactions.png :width: 600px .. _sr-tutorial-java-consumers: Java Consumers ^^^^^^^^^^^^^^ .. include:: includes/sr_java_consumers.rst --------------------- Example Consumer Code --------------------- .. include:: includes/sr_example_consumer.rst ---------------- Run the Consumer ---------------- .. include:: includes/sr_run_consumer.rst Other |ak| Clients ^^^^^^^^^^^^^^^^^^^ .. include:: includes/sr_other_clients.rst Centralized Schema Management ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Viewing Schemas in Schema Registry ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ At this point, you have producers serializing Avro data and consumers deserializing Avro data. The producers are registering schemas to |sr| and consumers are retrieving schemas from |sr|. #. From the |c3-short| navigation menu at `http://localhost:9021/ `__, make sure the cluster is selected on the left, and click **Topics**. #. Click the ``transactions`` topic and go to the **Schema** tab to retrieve the latest schema from |sr| for this topic: .. figure:: ../images/c3-schema-transactions.png :width: 600px The schema is identical to the :ref:`schema file defined for Java client applications`. .. _tutorial-use-curl-with-schema-registry: Using curl to Interact with Schema Registry ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can also use `curl `__ commands to connect directly to the REST endpoint in |sr| to view subjects and associated schemas. #. To view all the subjects registered in |sr| (assuming |sr| is running on the local machine listening on port 8081): .. code:: bash curl --silent -X GET http://localhost:8081/subjects/ | jq . Here is the expected output of the above command: .. code:: bash [ "transactions-value" ] In this example, the |ak| topic ``transactions`` has messages whose value (that is, `payload`) is Avro, and by default the |sr| subject name is ``transactions-value``. #. To view the latest schema for this subject in more detail: .. code:: bash curl --silent -X GET http://localhost:8081/subjects/transactions-value/versions/latest | jq . Here is the expected output of the above command: .. code:: bash { "subject": "transactions-value", "version": 1, "id": 1, "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}" } Here is a break-down of what this version of the schema defines: * ``subject``: the scope in which schemas for the messages in the topic ``transactions`` can evolve * ``version``: the schema version for this subject, which starts at 1 for each subject * ``id``: the globally unique schema version id, unique across all schemas in all subjects * ``schema``: the structure that defines the schema format Notice that in the output to the ``curl`` command above, the schema is escaped JSON; the double quotes are preceded by backslashes. #. Based on the schema id, you can also retrieve the associated schema by querying |sr| REST endpoint as follows: .. code:: bash curl --silent -X GET http://localhost:8081/schemas/ids/1 | jq . Here is the expected output: .. code:: bash { "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}" } Schema IDs in Messages ^^^^^^^^^^^^^^^^^^^^^^ Integration with |sr| means that |ak| messages do not need to be written with the entire Avro schema. Instead, |ak| messages are written with the schema id. The producers writing the messages and the consumers reading the messages must be using the same |sr| to get the same mapping between a schema and schema id. In this example, a producer sends the new schema for ``Payments`` to |sr|. |sr| registers this schema ``Payments`` to the subject ``transactions-value``, and returns the schema id of ``1`` to the producer. The producer caches this mapping between the schema and schema id for subsequent message writes, so it only contacts |sr| on the first schema write. When a consumer reads this data, it sees the Avro schema id of ``1`` and sends a schema request to |sr|. |sr| retrieves the schema associated to schema id ``1``, and returns the schema to the consumer. The consumer caches this mapping between the schema and schema id for subsequent message reads, so it only contacts |sr| on the first schema id read. .. _auto-schema-registration: Auto Schema Registration ^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: includes/auto-schema-registration.rst To manually register the schema outside of the application, you can use |c3-short|. .. include:: includes/sr_schema_registration.rst If you prefer to connect directly to the REST endpoint in |sr|, then to define a schema for a new subject for the topic ``test``, run the command below. .. code:: bash curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \ http://localhost:8081/subjects/test-value/versions In this sample output, it creates a schema with id of ``1``.: .. code:: bash {"id":1} Schema Evolution and Compatibility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Evolving Schemas ^^^^^^^^^^^^^^^^ .. include:: includes/sr_evolving_schemas.rst Failing Compatibility Checks ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: includes/sr_failing_compatibility_checks.rst #. Run the compatibility check and verify that it fails: .. code:: bash mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility Here is the error message you will get: .. code:: bash ... [ERROR] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc is not compatible with subject(transactions-value) ... #. Try to register the new schema ``Payment2a`` manually to |sr|, which is a useful way for non-Java clients to check compatibility if you are not using |c3-short|: .. code:: bash curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\"}]}"}' \ http://localhost:8081/subjects/transactions-value/versions As expected, |sr| rejects the schema with an error message that it is incompatible: .. code:: bash {"error_code":409,"message":"Schema being registered is incompatible with an earlier schema"} .. _sr_passing_compatibility_checks: Passing Compatibility Checks ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: includes/sr_passing_compatibility_checks.rst #. If you prefer to connect directly to the REST endpoint in |sr|, then to register the new schema ``Payment2b``, run the command below. It should succeed. .. code:: bash curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"}' \ http://localhost:8081/subjects/transactions-value/versions The above ``curl`` command, if successful, returns the version ``id`` of the new schema: .. code:: bash {"id":2} #. View the latest subject for ``transactions-value`` in |sr|: .. code:: bash curl --silent -X GET http://localhost:8081/subjects/transactions-value/versions/latest | jq . This command returns the latest |sr| subject for the ``transactions-value`` topic, including version number, id, and a description of the schema in JSON: .. code:: bash { "subject": "transactions-value", "version": 2, "id": 2, "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}" } Notice the changes: * ``version``: changed from ``1`` to ``2`` * ``id``: changed from ``1`` to ``2`` * ``schema``: updated with the new field ``region`` that has a default value Changing Compatibility Type ^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: includes/sr_change_compatibility_type.rst If you prefer to connect directly to the REST endpoint in |sr|, then to change the compatibility type for the topic ``transactions``, i.e., for the subject ``transactions-value``, run the example command below. .. code:: bash curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"compatibility": "BACKWARD_TRANSITIVE"}' \ http://localhost:8081/config/transactions-value Related Content ~~~~~~~~~~~~~~~ .. include:: includes/sr_next_steps.rst * Confluent supported schema formats, and how to configure clients using Avro, Protobuf, or JSON Schema: :platform:`Formats, Serializers, and Deserializers|schema-registry/fundamentals/serdes-develop/index.html` * Try it out: :platform:`Schema Registry API Usage Examples|schema-registry/develop/using.html`, showing more curl commands over HTTP and HTTPS * User guide for managing schemas on |c3|: :ref:`topicschema` * Production deployments of |sr|: :ref:`schema-registry-prod` * Big picture: :ref:`cp-demo` shows |sr| in the context of a full |cp| deployment, with various types of security enabled