.. _schema_registry_onprem_tutorial:
On-Premises |sr| Tutorial
=========================
Overview
~~~~~~~~
This tutorial provides a step-by-step workflow for using |sr-long|.
You will learn how to enable client applications to read and write Avro data, check compatibility as schemas evolve, and use |c3|, which has integrated capabilities with |sr|.
This tutorial is intended to run on a local install of |cp|. If you have a |ccloud| cluster, you may consult the |sr-ccloud| tutorial meant for running on |ccloud| at :ref:`schema_registry_ccloud_tutorial`.
Setup
~~~~~
.. _sr-tutorial-prereqs:
Prerequisites
^^^^^^^^^^^^^
Before proceeding with this tutorial, review a summary of the |sr| concepts in :ref:`schema_registry_tutorial`
Verify that you have installed the following on your local machine:
* `Confluent Platform `__
* :ref:`Confluent CLI `
* Java 1.8 or 1.11 to run |cp|
* Maven to compile the client Java code
* ``jq`` tool to nicely format the results from querying the |sr| REST endpoint
Environment Setup
^^^^^^^^^^^^^^^^^
#. Use the :ref:`quickstart` to bring up a single-node |cp| development environment. With a single-line :ref:`confluent_local` command, you can have a basic |ak| cluster with |sr|, |c3-short|, and other services running on your local machine.
.. code:: bash
confluent local start
Your output should resemeble:
.. code:: bash
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
#. .. include:: includes/sr_environment_setup.rst
#. Create a local configuration file with all the |ak| and |sr| connection information that is running on your local machine, and save it to ``$HOME/.confluent/java.config``. It should resemble below:
.. literalinclude:: ../tutorials/examples/clients/docs/includes/configs/local/java-sr.config
Create the transactions topic
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
For the exercises in this tutorial, you will be producing to and consuming from a topic called ``transactions``. Create this topic in |c3-short|.
#. Navigate to the |c3-short| web interface at `http://localhost:9021/ `_.
.. important:: It may take a minute or two for |c3-short| to come online.
.. image:: ../images/c3-landing-page.png
:width: 600px
#. Click into the cluster, select **Topics** and click **Add a topic**.
.. image:: ../images/c3-create-topic-sr.png
:width: 600px
#. Name the topic ``transactions`` and click **Create with defaults**.
.. image:: ../images/c3-create-topic-name-sr.png
:width: 600px
The new topic is displayed.
.. image:: ../images/c3-create-topic-new-sr.png
:width: 600px
.. _schema_registry_tutorial_definition:
Schema Definition
~~~~~~~~~~~~~~~~~
.. include:: includes/sr_schema_definition.rst
.. _sr-tutorial-clients-avro-maven:
Client Applications Writing Avro
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Maven
^^^^^
.. include:: includes/sr_maven.rst
Configuring Avro
^^^^^^^^^^^^^^^^
.. include:: includes/sr_configuring_avro.rst
.. _sr-tutorial-java-producers:
Java Producers
^^^^^^^^^^^^^^
.. include:: includes/sr_java_producers.rst
---------------------
Example Producer Code
---------------------
.. include:: includes/sr_example_producer.rst
----------------
Run the Producer
----------------
Run the following commands in a shell from ``examples/clients/avro``.
#. To run this producer, first compile the project:
.. code:: bash
mvn clean compile package
#. From the |c3-short| navigation menu at `http://localhost:9021/ `_, make sure the cluster is selected, and click **Topics**.
Next, click the ``transactions`` topic and go to the **Messages** tab.
You should see no messages because no messages have been produced to this topic yet.
#. .. include:: includes/sr_run_producer.rst
#. Now you should be able to see messages in |c3-short| by inspecting the ``transactions`` topic as it dynamically deserializes the newly arriving data that was serialized as Avro.
At `http://localhost:9021/ `_, click into the cluster on the left, then go to **Topics** -> ``transactions`` -> **Messages**.
.. tip:: If you do not see any data, rerun the Producer and verify it completed successfully, and look at the |c3-short| again. The messages do not persist in the Console, so you need to view them soon after you run the producer.
.. figure:: ../images/c3-inspect-transactions.png
:width: 600px
.. _sr-tutorial-java-consumers:
Java Consumers
^^^^^^^^^^^^^^
.. include:: includes/sr_java_consumers.rst
---------------------
Example Consumer Code
---------------------
.. include:: includes/sr_example_consumer.rst
----------------
Run the Consumer
----------------
.. include:: includes/sr_run_consumer.rst
Other |ak| Clients
^^^^^^^^^^^^^^^^^^^
.. include:: includes/sr_other_clients.rst
Centralized Schema Management
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Viewing Schemas in Schema Registry
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
At this point, you have producers serializing Avro data and consumers deserializing Avro data.
The producers are registering schemas to |sr| and consumers are retrieving schemas from |sr|.
#. From the |c3-short| navigation menu at `http://localhost:9021/ `__, make sure the cluster is selected on the left, and click **Topics**.
#. Click the ``transactions`` topic and go to the **Schema** tab to retrieve the latest schema from |sr| for this topic:
.. figure:: ../images/c3-schema-transactions.png
:width: 600px
The schema is identical to the :ref:`schema file defined for Java client applications`.
.. _tutorial-use-curl-with-schema-registry:
Using curl to Interact with Schema Registry
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can also use `curl `__ commands to connect directly to the REST endpoint in |sr| to view subjects and associated schemas.
#. To view all the subjects registered in |sr| (assuming |sr| is running on the local machine listening on port 8081):
.. code:: bash
curl --silent -X GET http://localhost:8081/subjects/ | jq .
Here is the expected output of the above command:
.. code:: bash
[
"transactions-value"
]
In this example, the |ak| topic ``transactions`` has messages whose value (that is, `payload`) is Avro, and by default the |sr| subject name is ``transactions-value``.
#. To view the latest schema for this subject in more detail:
.. code:: bash
curl --silent -X GET http://localhost:8081/subjects/transactions-value/versions/latest | jq .
Here is the expected output of the above command:
.. code:: bash
{
"subject": "transactions-value",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
}
Here is a break-down of what this version of the schema defines:
* ``subject``: the scope in which schemas for the messages in the topic ``transactions`` can evolve
* ``version``: the schema version for this subject, which starts at 1 for each subject
* ``id``: the globally unique schema version id, unique across all schemas in all subjects
* ``schema``: the structure that defines the schema format
Notice that in the output to the ``curl`` command above, the schema is escaped JSON; the double quotes are preceded by backslashes.
#. Based on the schema id, you can also retrieve the associated schema by querying |sr| REST endpoint as follows:
.. code:: bash
curl --silent -X GET http://localhost:8081/schemas/ids/1 | jq .
Here is the expected output:
.. code:: bash
{
"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
}
Schema IDs in Messages
^^^^^^^^^^^^^^^^^^^^^^
Integration with |sr| means that |ak| messages do not need to be written with the entire Avro schema.
Instead, |ak| messages are written with the schema id.
The producers writing the messages and the consumers reading the messages must be using the same |sr| to get the same mapping between a schema and schema id.
In this example, a producer sends the new schema for ``Payments`` to |sr|.
|sr| registers this schema ``Payments`` to the subject ``transactions-value``, and returns the schema id of ``1`` to the producer.
The producer caches this mapping between the schema and schema id for subsequent message writes, so it only contacts |sr| on the first schema write.
When a consumer reads this data, it sees the Avro schema id of ``1`` and sends a schema request to |sr|.
|sr| retrieves the schema associated to schema id ``1``, and returns the schema to the consumer.
The consumer caches this mapping between the schema and schema id for subsequent message reads, so it only contacts |sr| on the first schema id read.
.. _auto-schema-registration:
Auto Schema Registration
^^^^^^^^^^^^^^^^^^^^^^^^
.. include:: includes/auto-schema-registration.rst
To manually register the schema outside of the application, you can use |c3-short|.
.. include:: includes/sr_schema_registration.rst
If you prefer to connect directly to the REST endpoint in |sr|, then to define a schema for a new subject for the topic ``test``, run the command below.
.. code:: bash
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \
http://localhost:8081/subjects/test-value/versions
In this sample output, it creates a schema with id of ``1``.:
.. code:: bash
{"id":1}
Schema Evolution and Compatibility
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Evolving Schemas
^^^^^^^^^^^^^^^^
.. include:: includes/sr_evolving_schemas.rst
Failing Compatibility Checks
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. include:: includes/sr_failing_compatibility_checks.rst
#. Run the compatibility check and verify that it fails:
.. code:: bash
mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility
Here is the error message you will get:
.. code:: bash
...
[ERROR] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc is not compatible with subject(transactions-value)
...
#. Try to register the new schema ``Payment2a`` manually to |sr|, which is a useful way for non-Java clients to check compatibility if you are not using |c3-short|:
.. code:: bash
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/transactions-value/versions
As expected, |sr| rejects the schema with an error message that it is incompatible:
.. code:: bash
{"error_code":409,"message":"Schema being registered is incompatible with an earlier schema"}
Passing Compatibility Checks
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. include:: includes/sr_passing_compatibility_checks.rst
#. If you prefer to connect directly to the REST endpoint in |sr|, then to register the new schema ``Payment2b``, run the command below. It should succeed.
.. code:: bash
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"}' \
http://localhost:8081/subjects/transactions-value/versions
The above ``curl`` command, if successful, returns the version ``id`` of the new schema:
.. code:: bash
{"id":2}
#. View the latest subject for ``transactions-value`` in |sr|:
.. code:: bash
curl --silent -X GET http://localhost:8081/subjects/transactions-value/versions/latest | jq .
This command returns the latest |sr| subject for the ``transactions-value`` topic, including version number, id, and a description of the schema in JSON:
.. code:: bash
{
"subject": "transactions-value",
"version": 2,
"id": 2,
"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"
}
Notice the changes:
* ``version``: changed from ``1`` to ``2``
* ``id``: changed from ``1`` to ``2``
* ``schema``: updated with the new field ``region`` that has a default value
Changing Compatibility Type
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. include:: includes/sr_change_compatibility_type.rst
If you prefer to connect directly to the REST endpoint in |sr|, then to change the compatibility type for the topic ``transactions``, i.e., for the subject ``transactions-value``, run the example command below.
.. code:: bash
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
http://localhost:8081/config/transactions-value
Next Steps
~~~~~~~~~~
.. include:: includes/sr_next_steps.rst
* Read the user guide on managing schemas on |c3|: :ref:`topicschema`
* For production deployments of |sr|, refer to :ref:`schema-registry-prod`.
* Work through the :ref:`cp-demo` to understand |sr| in the context of a full |cp| deployment, including various types of security enabled.