.. _schema_registry_onprem_tutorial: On-Premises |sr| Tutorial ========================= Overview ~~~~~~~~ This tutorial provides a step-by-step workflow for using |sr-long|. You will learn how to enable client applications to read and write Avro data, check compatibility as schemas evolve, and use |c3|, which has integrated capabilities with |sr|. This tutorial is intended to run on a local install of |cp|. If you have a |ccloud| cluster, you may consult the |sr-ccloud| tutorial meant for running on |ccloud| at :ref:`schema_registry_ccloud_tutorial`. Setup ~~~~~ .. _sr-tutorial-prereqs: Prerequisites ^^^^^^^^^^^^^ Before proceeding with this tutorial, review a summary of the |sr| concepts in :ref:`schema_registry_tutorial` Verify that you have installed the following on your local machine: * `Confluent Platform `__ * :ref:`Confluent CLI ` * Java 1.8 or 1.11 to run |cp| * Maven to compile the client Java code * ``jq`` tool to nicely format the results from querying the |sr| REST endpoint Environment Setup ^^^^^^^^^^^^^^^^^ #. Use the :ref:`quickstart` to bring up a single-node |cp| development environment. With a single-line :ref:`confluent_local` command, you can have a basic |ak| cluster with |sr|, |c3-short|, and other services running on your local machine. .. code:: bash confluent local start Your output should resemeble: .. code:: bash Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Starting ksql-server ksql-server is [UP] Starting control-center control-center is [UP] #. .. include:: includes/sr_environment_setup.rst #. Create a local configuration file with all the |ak| and |sr| connection information that is running on your local machine, and save it to ``$HOME/.confluent/java.config``. It should resemble below: .. literalinclude:: ../tutorials/examples/clients/docs/includes/configs/local/java-sr.config Create the transactions topic ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For the exercises in this tutorial, you will be producing to and consuming from a topic called ``transactions``. Create this topic in |c3-short|. #. Navigate to the |c3-short| web interface at `http://localhost:9021/ `_. .. important:: It may take a minute or two for |c3-short| to come online. .. image:: ../images/c3-landing-page.png :width: 600px #. Click into the cluster, select **Topics** and click **Add a topic**. .. image:: ../images/c3-create-topic-sr.png :width: 600px #. Name the topic ``transactions`` and click **Create with defaults**. .. image:: ../images/c3-create-topic-name-sr.png :width: 600px The new topic is displayed. .. image:: ../images/c3-create-topic-new-sr.png :width: 600px .. _schema_registry_tutorial_definition: Schema Definition ~~~~~~~~~~~~~~~~~ .. include:: includes/sr_schema_definition.rst .. _sr-tutorial-clients-avro-maven: Client Applications Writing Avro ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Maven ^^^^^ .. include:: includes/sr_maven.rst Configuring Avro ^^^^^^^^^^^^^^^^ .. include:: includes/sr_configuring_avro.rst .. _sr-tutorial-java-producers: Java Producers ^^^^^^^^^^^^^^ .. include:: includes/sr_java_producers.rst --------------------- Example Producer Code --------------------- .. include:: includes/sr_example_producer.rst ---------------- Run the Producer ---------------- Run the following commands in a shell from ``examples/clients/avro``. #. To run this producer, first compile the project: .. code:: bash mvn clean compile package #. From the |c3-short| navigation menu at `http://localhost:9021/ `_, make sure the cluster is selected, and click **Topics**. Next, click the ``transactions`` topic and go to the **Messages** tab. You should see no messages because no messages have been produced to this topic yet. #. .. include:: includes/sr_run_producer.rst #. Now you should be able to see messages in |c3-short| by inspecting the ``transactions`` topic as it dynamically deserializes the newly arriving data that was serialized as Avro. At `http://localhost:9021/ `_, click into the cluster on the left, then go to **Topics** -> ``transactions`` -> **Messages**. .. tip:: If you do not see any data, rerun the Producer and verify it completed successfully, and look at the |c3-short| again. The messages do not persist in the Console, so you need to view them soon after you run the producer. .. figure:: ../images/c3-inspect-transactions.png :width: 600px .. _sr-tutorial-java-consumers: Java Consumers ^^^^^^^^^^^^^^ .. include:: includes/sr_java_consumers.rst --------------------- Example Consumer Code --------------------- .. include:: includes/sr_example_consumer.rst ---------------- Run the Consumer ---------------- .. include:: includes/sr_run_consumer.rst Other |ak| Clients ^^^^^^^^^^^^^^^^^^^ .. include:: includes/sr_other_clients.rst Centralized Schema Management ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Viewing Schemas in Schema Registry ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ At this point, you have producers serializing Avro data and consumers deserializing Avro data. The producers are registering schemas to |sr| and consumers are retrieving schemas from |sr|. #. From the |c3-short| navigation menu at `http://localhost:9021/ `__, make sure the cluster is selected on the left, and click **Topics**. #. Click the ``transactions`` topic and go to the **Schema** tab to retrieve the latest schema from |sr| for this topic: .. figure:: ../images/c3-schema-transactions.png :width: 600px The schema is identical to the :ref:`schema file defined for Java client applications`. .. _tutorial-use-curl-with-schema-registry: Using curl to Interact with Schema Registry ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can also use `curl `__ commands to connect directly to the REST endpoint in |sr| to view subjects and associated schemas. #. To view all the subjects registered in |sr| (assuming |sr| is running on the local machine listening on port 8081): .. code:: bash curl --silent -X GET http://localhost:8081/subjects/ | jq . Here is the expected output of the above command: .. code:: bash [ "transactions-value" ] In this example, the |ak| topic ``transactions`` has messages whose value (that is, `payload`) is Avro, and by default the |sr| subject name is ``transactions-value``. #. To view the latest schema for this subject in more detail: .. code:: bash curl --silent -X GET http://localhost:8081/subjects/transactions-value/versions/latest | jq . Here is the expected output of the above command: .. code:: bash { "subject": "transactions-value", "version": 1, "id": 1, "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}" } Here is a break-down of what this version of the schema defines: * ``subject``: the scope in which schemas for the messages in the topic ``transactions`` can evolve * ``version``: the schema version for this subject, which starts at 1 for each subject * ``id``: the globally unique schema version id, unique across all schemas in all subjects * ``schema``: the structure that defines the schema format Notice that in the output to the ``curl`` command above, the schema is escaped JSON; the double quotes are preceded by backslashes. #. Based on the schema id, you can also retrieve the associated schema by querying |sr| REST endpoint as follows: .. code:: bash curl --silent -X GET http://localhost:8081/schemas/ids/1 | jq . Here is the expected output: .. code:: bash { "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}" } Schema IDs in Messages ^^^^^^^^^^^^^^^^^^^^^^ Integration with |sr| means that |ak| messages do not need to be written with the entire Avro schema. Instead, |ak| messages are written with the schema id. The producers writing the messages and the consumers reading the messages must be using the same |sr| to get the same mapping between a schema and schema id. In this example, a producer sends the new schema for ``Payments`` to |sr|. |sr| registers this schema ``Payments`` to the subject ``transactions-value``, and returns the schema id of ``1`` to the producer. The producer caches this mapping between the schema and schema id for subsequent message writes, so it only contacts |sr| on the first schema write. When a consumer reads this data, it sees the Avro schema id of ``1`` and sends a schema request to |sr|. |sr| retrieves the schema associated to schema id ``1``, and returns the schema to the consumer. The consumer caches this mapping between the schema and schema id for subsequent message reads, so it only contacts |sr| on the first schema id read. .. _auto-schema-registration: Auto Schema Registration ^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: includes/auto-schema-registration.rst To manually register the schema outside of the application, you can use |c3-short|. .. include:: includes/sr_schema_registration.rst If you prefer to connect directly to the REST endpoint in |sr|, then to define a schema for a new subject for the topic ``test``, run the command below. .. code:: bash curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \ http://localhost:8081/subjects/test-value/versions In this sample output, it creates a schema with id of ``1``.: .. code:: bash {"id":1} Schema Evolution and Compatibility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Evolving Schemas ^^^^^^^^^^^^^^^^ .. include:: includes/sr_evolving_schemas.rst Failing Compatibility Checks ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: includes/sr_failing_compatibility_checks.rst #. Run the compatibility check and verify that it fails: .. code:: bash mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility Here is the error message you will get: .. code:: bash ... [ERROR] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc is not compatible with subject(transactions-value) ... #. Try to register the new schema ``Payment2a`` manually to |sr|, which is a useful way for non-Java clients to check compatibility if you are not using |c3-short|: .. code:: bash curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\"}]}"}' \ http://localhost:8081/subjects/transactions-value/versions As expected, |sr| rejects the schema with an error message that it is incompatible: .. code:: bash {"error_code":409,"message":"Schema being registered is incompatible with an earlier schema"} Passing Compatibility Checks ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: includes/sr_passing_compatibility_checks.rst #. If you prefer to connect directly to the REST endpoint in |sr|, then to register the new schema ``Payment2b``, run the command below. It should succeed. .. code:: bash curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"}' \ http://localhost:8081/subjects/transactions-value/versions The above ``curl`` command, if successful, returns the version ``id`` of the new schema: .. code:: bash {"id":2} #. View the latest subject for ``transactions-value`` in |sr|: .. code:: bash curl --silent -X GET http://localhost:8081/subjects/transactions-value/versions/latest | jq . This command returns the latest |sr| subject for the ``transactions-value`` topic, including version number, id, and a description of the schema in JSON: .. code:: bash { "subject": "transactions-value", "version": 2, "id": 2, "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}" } Notice the changes: * ``version``: changed from ``1`` to ``2`` * ``id``: changed from ``1`` to ``2`` * ``schema``: updated with the new field ``region`` that has a default value Changing Compatibility Type ^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: includes/sr_change_compatibility_type.rst If you prefer to connect directly to the REST endpoint in |sr|, then to change the compatibility type for the topic ``transactions``, i.e., for the subject ``transactions-value``, run the example command below. .. code:: bash curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"compatibility": "BACKWARD_TRANSITIVE"}' \ http://localhost:8081/config/transactions-value Next Steps ~~~~~~~~~~ .. include:: includes/sr_next_steps.rst * Read the user guide on managing schemas on |c3|: :ref:`topicschema` * For production deployments of |sr|, refer to :ref:`schema-registry-prod`. * Work through the :ref:`cp-demo` to understand |sr| in the context of a full |cp| deployment, including various types of security enabled.