Source code for confluent_kafka.schema_registry._sync.schema_registry_client

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
import logging
import os
import ssl
import time
import urllib
from typing import Any, Callable, Dict, List, Literal, Optional, Union
from urllib.parse import unquote, urlparse

import certifi
import httpx
from authlib.integrations.httpx_client import OAuth2Client
from cachetools import Cache, LRUCache, TTLCache
from httpx import Response

from confluent_kafka.schema_registry.common.schema_registry_client import (
    RegisteredSchema,
    Schema,
    SchemaVersion,
    ServerConfig,
    _BearerFieldProvider,
    _SchemaCache,
    _StaticFieldProvider,
    full_jitter,
    is_retriable,
    is_success,
)
from confluent_kafka.schema_registry.error import OAuthTokenError, SchemaRegistryError

__all__ = [
    '_urlencode',
    '_CustomOAuthClient',
    '_OAuthClient',
    '_BaseRestClient',
    '_RestClient',
    'SchemaRegistryClient',
]

# TODO: consider adding `six` dependency or employing a compat file
# Python 2.7 is officially EOL so compatibility issue will be come more the norm.
# We need a better way to handle these issues.
# Six is one possibility but the compat file pattern used by requests
# is also quite nice.
#
# six: https://pypi.org/project/six/
# compat file : https://github.com/psf/requests/blob/master/requests/compat.py
try:
    string_type = basestring  # type: ignore[name-defined]  # noqa

    def _urlencode(value: str) -> str:
        return urllib.quote(value, safe='')  # type: ignore[attr-defined]

except NameError:
    string_type = str

    def _urlencode(value: str) -> str:
        return urllib.parse.quote(value, safe='')


log = logging.getLogger(__name__)


class _CustomOAuthClient(_BearerFieldProvider):
    def __init__(self, custom_function: Callable[[Dict], Dict], custom_config: dict):
        self.custom_function = custom_function
        self.custom_config = custom_config

    def get_bearer_fields(self) -> dict:
        return self.custom_function(self.custom_config)


class _OAuthClient(_BearerFieldProvider):
    def __init__(
        self,
        client_id: str,
        client_secret: str,
        scope: str,
        token_endpoint: str,
        logical_cluster: str,
        identity_pool: str,
        max_retries: int,
        retries_wait_ms: int,
        retries_max_wait_ms: int,
    ):
        self.token = None
        self.logical_cluster = logical_cluster
        self.identity_pool = identity_pool
        self.client = OAuth2Client(client_id=client_id, client_secret=client_secret, scope=scope)
        self.token_endpoint = token_endpoint
        self.max_retries = max_retries
        self.retries_wait_ms = retries_wait_ms
        self.retries_max_wait_ms = retries_max_wait_ms
        self.token_expiry_threshold = 0.8

    def get_bearer_fields(self) -> dict:
        return {
            'bearer.auth.token': self.get_access_token(),
            'bearer.auth.logical.cluster': self.logical_cluster,
            'bearer.auth.identity.pool.id': self.identity_pool,
        }

    def token_expired(self) -> bool:
        if self.token is None:
            raise ValueError("Token is not set")

        expiry_window = self.token['expires_in'] * self.token_expiry_threshold

        return self.token['expires_at'] < time.time() + expiry_window

    def get_access_token(self) -> str:
        if not self.token or self.token_expired():
            self.generate_access_token()
        if self.token is None:
            raise ValueError("Token is not set after the attempt to generate it")
        return self.token['access_token']

    def generate_access_token(self) -> None:
        for i in range(self.max_retries + 1):
            try:
                self.token = self.client.fetch_token(url=self.token_endpoint, grant_type='client_credentials')
                return
            except Exception as e:
                if i >= self.max_retries:
                    raise OAuthTokenError(
                        f"Failed to retrieve token after {self.max_retries} " f"attempts due to error: {str(e)}"
                    )
                time.sleep(full_jitter(self.retries_wait_ms, self.retries_max_wait_ms, i) / 1000)


class _BaseRestClient(object):

    def __init__(self, conf: dict):
        # copy dict to avoid mutating the original
        conf_copy = conf.copy()

        base_url = conf_copy.pop('url', None)
        if base_url is None:
            raise ValueError("Missing required configuration property url")
        if not isinstance(base_url, string_type):
            raise TypeError("url must be a str, not " + str(type(base_url)))
        base_urls = []
        for url in base_url.split(','):
            url = url.strip().rstrip('/')
            if not url.startswith('http') and not url.startswith('mock'):
                raise ValueError("Invalid url {}".format(url))
            base_urls.append(url)
        if not base_urls:
            raise ValueError("Missing required configuration property url")
        self.base_urls = base_urls

        ca: Union[str, bool, None] = conf_copy.pop('ssl.ca.location', None)
        key: Optional[str] = conf_copy.pop('ssl.key.location', None)
        key_password: Optional[str] = conf_copy.pop('ssl.key.password', None)
        client_cert: Optional[str] = conf_copy.pop('ssl.certificate.location', None)

        # this mimicks legacy, deprecated behaviour of httpx
        # self.verify is always set to an ssl.SSLContext in case we need to load_cert_chain
        if ca is False:
            self.verify = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
            self.verify.check_hostname = False
            self.verify.verify_mode = ssl.CERT_NONE
        elif isinstance(ca, str):
            if os.path.isdir(ca):
                self.verify = ssl.create_default_context(capath=ca)
            else:
                self.verify = ssl.create_default_context(cafile=ca)
        else:
            if os.environ.get("SSL_CERT_FILE"):
                self.verify = ssl.create_default_context(cafile=os.environ["SSL_CERT_FILE"])
            elif os.environ.get("SSL_CERT_DIR"):
                self.verify = ssl.create_default_context(capath=os.environ["SSL_CERT_DIR"])
            else:
                self.verify = ssl.create_default_context(cafile=certifi.where())

        if client_cert is not None:
            if key is not None and key_password is not None:
                self.verify.load_cert_chain(certfile=client_cert, keyfile=key, password=key_password)
            elif key is not None:
                self.verify.load_cert_chain(certfile=client_cert, keyfile=key)
            elif key_password is not None:
                self.verify.load_cert_chain(certfile=client_cert, password=key_password)
            else:
                self.verify.load_cert_chain(certfile=client_cert)

        if (key is not None or key_password is not None) and client_cert is None:
            raise ValueError(
                "ssl.certificate.location required when" " configuring ssl.key.location or ssl.key.password"
            )

        parsed = urlparse(self.base_urls[0])
        try:
            userinfo = (unquote(parsed.username), unquote(parsed.password))
        except (AttributeError, TypeError):
            userinfo = ("", "")
        if 'basic.auth.user.info' in conf_copy:
            if userinfo != ('', ''):
                raise ValueError(
                    "basic.auth.user.info configured with"
                    " userinfo credentials in the URL."
                    " Remove userinfo credentials from the url or"
                    " remove basic.auth.user.info from the"
                    " configuration"
                )

            userinfo = tuple(conf_copy.pop('basic.auth.user.info', '').split(':', 1))

            if len(userinfo) != 2:
                raise ValueError("basic.auth.user.info must be in the form" " of {username}:{password}")

        self.auth = userinfo if userinfo != ('', '') else None

        # The following adds support for proxy config
        # If specified: it uses the specified proxy details when making requests
        self.proxy = None
        proxy = conf_copy.pop('proxy', None)
        if proxy is not None:
            self.proxy = proxy

        self.timeout = None
        timeout = conf_copy.pop('timeout', None)
        if timeout is not None:
            self.timeout = timeout

        self.cache_capacity = 1000
        cache_capacity = conf_copy.pop('cache.capacity', None)
        if cache_capacity is not None:
            if not isinstance(cache_capacity, (int, float)):
                raise TypeError("cache.capacity must be a number, not " + str(type(cache_capacity)))
            self.cache_capacity = int(cache_capacity)

        self.cache_latest_ttl_sec = None
        cache_latest_ttl_sec = conf_copy.pop('cache.latest.ttl.sec', None)
        if cache_latest_ttl_sec is not None:
            if not isinstance(cache_latest_ttl_sec, (int, float)):
                raise TypeError("cache.latest.ttl.sec must be a number, not " + str(type(cache_latest_ttl_sec)))
            self.cache_latest_ttl_sec = cache_latest_ttl_sec

        self.max_retries = 3
        max_retries = conf_copy.pop('max.retries', None)
        if max_retries is not None:
            if not isinstance(max_retries, (int, float)):
                raise TypeError("max.retries must be a number, not " + str(type(max_retries)))
            self.max_retries = int(max_retries)

        self.retries_wait_ms = 1000
        retries_wait_ms = conf_copy.pop('retries.wait.ms', None)
        if retries_wait_ms is not None:
            if not isinstance(retries_wait_ms, (int, float)):
                raise TypeError("retries.wait.ms must be a number, not " + str(type(retries_wait_ms)))
            self.retries_wait_ms = int(retries_wait_ms)

        self.retries_max_wait_ms = 20000
        retries_max_wait_ms = conf_copy.pop('retries.max.wait.ms', None)
        if retries_max_wait_ms is not None:
            if not isinstance(retries_max_wait_ms, (int, float)):
                raise TypeError("retries.max.wait.ms must be a number, not " + str(type(retries_max_wait_ms)))
            self.retries_max_wait_ms = int(retries_max_wait_ms)

        logical_cluster = None
        identity_pool = None
        self.bearer_field_provider: Optional[_BearerFieldProvider] = None
        self.bearer_auth_credentials_source = conf_copy.pop('bearer.auth.credentials.source', None)
        if self.bearer_auth_credentials_source is not None:
            self.auth = None

            if self.bearer_auth_credentials_source in {'OAUTHBEARER', 'STATIC_TOKEN'}:
                headers = ['bearer.auth.logical.cluster', 'bearer.auth.identity.pool.id']
                missing_headers = [header for header in headers if header not in conf_copy]
                if missing_headers:
                    raise ValueError(
                        "Missing required bearer configuration properties: {}".format(", ".join(missing_headers))
                    )

                logical_cluster = conf_copy.pop('bearer.auth.logical.cluster')
                if not isinstance(logical_cluster, str):
                    raise TypeError("logical cluster must be a str, not " + str(type(logical_cluster)))

                identity_pool = conf_copy.pop('bearer.auth.identity.pool.id')
                if not isinstance(identity_pool, str):
                    raise TypeError("identity pool id must be a str, not " + str(type(identity_pool)))

                if self.bearer_auth_credentials_source == 'OAUTHBEARER':
                    properties_list = [
                        'bearer.auth.client.id',
                        'bearer.auth.client.secret',
                        'bearer.auth.scope',
                        'bearer.auth.issuer.endpoint.url',
                    ]
                    missing_properties = [prop for prop in properties_list if prop not in conf_copy]
                    if missing_properties:
                        raise ValueError(
                            "Missing required OAuth configuration properties: {}".format(", ".join(missing_properties))
                        )

                    self.client_id = conf_copy.pop('bearer.auth.client.id')
                    if not isinstance(self.client_id, string_type):
                        raise TypeError("bearer.auth.client.id must be a str, not " + str(type(self.client_id)))

                    self.client_secret = conf_copy.pop('bearer.auth.client.secret')
                    if not isinstance(self.client_secret, string_type):
                        raise TypeError("bearer.auth.client.secret must be a str, not " + str(type(self.client_secret)))

                    self.scope = conf_copy.pop('bearer.auth.scope')
                    if not isinstance(self.scope, string_type):
                        raise TypeError("bearer.auth.scope must be a str, not " + str(type(self.scope)))

                    self.token_endpoint = conf_copy.pop('bearer.auth.issuer.endpoint.url')
                    if not isinstance(self.token_endpoint, string_type):
                        raise TypeError(
                            "bearer.auth.issuer.endpoint.url must be a str, not " + str(type(self.token_endpoint))
                        )

                    self.bearer_field_provider = _OAuthClient(
                        self.client_id,
                        self.client_secret,
                        self.scope,
                        self.token_endpoint,
                        logical_cluster,
                        identity_pool,
                        self.max_retries,
                        self.retries_wait_ms,
                        self.retries_max_wait_ms,
                    )
                else:  # STATIC_TOKEN
                    if 'bearer.auth.token' not in conf_copy:
                        raise ValueError("Missing bearer.auth.token")
                    static_token = conf_copy.pop('bearer.auth.token')
                    self.bearer_field_provider = _StaticFieldProvider(static_token, logical_cluster, identity_pool)
                    if not isinstance(static_token, string_type):
                        raise TypeError("bearer.auth.token must be a str, not " + str(type(static_token)))
            elif self.bearer_auth_credentials_source == 'CUSTOM':
                custom_bearer_properties = [
                    'bearer.auth.custom.provider.function',
                    'bearer.auth.custom.provider.config',
                ]
                missing_custom_properties = [prop for prop in custom_bearer_properties if prop not in conf_copy]
                if missing_custom_properties:
                    raise ValueError(
                        "Missing required custom OAuth configuration properties: {}".format(
                            ", ".join(missing_custom_properties)
                        )
                    )

                custom_function = conf_copy.pop('bearer.auth.custom.provider.function')
                if not callable(custom_function):
                    raise TypeError(
                        "bearer.auth.custom.provider.function must be a callable, not " + str(type(custom_function))
                    )

                custom_config = conf_copy.pop('bearer.auth.custom.provider.config')
                if not isinstance(custom_config, dict):
                    raise TypeError(
                        "bearer.auth.custom.provider.config must be a dict, not " + str(type(custom_config))
                    )

                self.bearer_field_provider = _CustomOAuthClient(custom_function, custom_config)
            else:
                raise ValueError('Unrecognized bearer.auth.credentials.source')

        # Any leftover keys are unknown to _RestClient
        if len(conf_copy) > 0:
            raise ValueError("Unrecognized properties: {}".format(", ".join(conf_copy.keys())))

    def get(self, url: str, query: Optional[dict] = None) -> Any:
        raise NotImplementedError()

    def post(self, url: str, body: Optional[dict], **kwargs) -> Any:
        raise NotImplementedError()

    def delete(self, url: str) -> Any:
        raise NotImplementedError()

    def put(self, url: str, body: Optional[dict] = None) -> Any:
        raise NotImplementedError()


class _RestClient(_BaseRestClient):
    """
    HTTP client for Confluent Schema Registry.

    See SchemaRegistryClient for configuration details.

    Args:
        conf (dict): Dictionary containing _RestClient configuration
    """

    def __init__(self, conf: dict):
        super().__init__(conf)

        self.session = httpx.Client(verify=self.verify, auth=self.auth, proxy=self.proxy, timeout=self.timeout)

    def handle_bearer_auth(self, headers: dict) -> None:
        if self.bearer_field_provider is None:
            raise ValueError("Bearer field provider is not set")
        bearer_fields = self.bearer_field_provider.get_bearer_fields()
        required_fields = ['bearer.auth.token', 'bearer.auth.identity.pool.id', 'bearer.auth.logical.cluster']

        missing_fields = []
        for field in required_fields:
            if field not in bearer_fields:
                missing_fields.append(field)

        if missing_fields:
            raise ValueError(
                "Missing required bearer auth fields, needs to be set in config or custom function: {}".format(
                    ", ".join(missing_fields)
                )
            )

        headers["Authorization"] = "Bearer {}".format(bearer_fields['bearer.auth.token'])
        headers['Confluent-Identity-Pool-Id'] = bearer_fields['bearer.auth.identity.pool.id']
        headers['target-sr-cluster'] = bearer_fields['bearer.auth.logical.cluster']

    def get(self, url: str, query: Optional[dict] = None) -> Any:
        return self.send_request(url, method='GET', query=query)

    def post(self, url: str, body: Optional[dict], **kwargs) -> Any:
        return self.send_request(url, method='POST', body=body)

    def delete(self, url: str) -> Any:
        return self.send_request(url, method='DELETE')

    def put(self, url: str, body: Optional[dict] = None) -> Any:
        return self.send_request(url, method='PUT', body=body)

    def send_request(self, url: str, method: str, body: Optional[dict] = None, query: Optional[dict] = None) -> Any:
        """
        Sends HTTP request to the SchemaRegistry, trying each base URL in turn.

        All unsuccessful attempts will raise a SchemaRegistryError with the
        response contents. In most cases this will be accompanied by a
        Schema Registry supplied error code.

        In the event the response is malformed an error_code of -1 will be used.

        Args:
            url (str): Request path

            method (str): HTTP method

            body (str): Request content

            query (dict): Query params to attach to the URL

        Returns:
            dict: Schema Registry response content.
        """

        headers = {
            'Accept': "application/vnd.schemaregistry.v1+json,"
            " application/vnd.schemaregistry+json,"
            " application/json"
        }

        body_str: Optional[str] = None
        if body is not None:
            body_str = json.dumps(body)
            headers = {
                'Content-Length': str(len(body_str)),
                'Content-Type': "application/vnd.schemaregistry.v1+json",
                'Confluent-Accept-Unknown-Properties': "true",
            }

        if self.bearer_auth_credentials_source:
            self.handle_bearer_auth(headers)

        response = None
        for i, base_url in enumerate(self.base_urls):
            try:
                response = self.send_http_request(base_url, url, method, headers, body_str, query)

                if is_success(response.status_code):
                    return response.json()

                if not is_retriable(response.status_code) or i == len(self.base_urls) - 1:
                    break
            except Exception as e:
                if i == len(self.base_urls) - 1:
                    # Raise the exception since we have no more urls to try
                    raise e

        if isinstance(response, Response):
            try:
                raise SchemaRegistryError(
                    response.status_code, response.json().get('error_code'), response.json().get('message')
                )
            except (ValueError, KeyError, AttributeError):
                raise SchemaRegistryError(
                    response.status_code, -1, "Unknown Schema Registry Error: " + str(response.content)
                )
        else:
            raise TypeError("Unexpected response of unsupported type: " + str(type(response)))

    def send_http_request(
        self,
        base_url: str,
        url: str,
        method: str,
        headers: Optional[dict],
        body: Optional[str] = None,
        query: Optional[dict] = None,
    ) -> Response:
        """
        Sends HTTP request to the SchemaRegistry.

        All unsuccessful attempts will raise a SchemaRegistryError with the
        response contents. In most cases this will be accompanied by a
        Schema Registry supplied error code.

        In the event the response is malformed an error_code of -1 will be used.

        Args:
            base_url (str): Schema Registry base URL

            url (str): Request path

            method (str): HTTP method

            headers (dict): Headers

            body (str): Request content

            query (dict): Query params to attach to the URL

        Returns:
            Response: Schema Registry response content.
        """
        response = None
        for i in range(self.max_retries + 1):
            response = self.session.request(
                method, url="/".join([base_url, url]), headers=headers, content=body, params=query
            )

            if is_success(response.status_code):
                return response

            if not is_retriable(response.status_code) or i >= self.max_retries:
                return response

            time.sleep(full_jitter(self.retries_wait_ms, self.retries_max_wait_ms, i) / 1000)
        return response  # type: ignore[return-value]


[docs] class SchemaRegistryClient(object): """ A Confluent Schema Registry client. Configuration properties (* indicates a required field): +------------------------------+------+-------------------------------------------------+ | Property name | type | Description | +==============================+======+=================================================+ | ``url`` * | str | Comma-separated list of Schema Registry URLs. | +------------------------------+------+-------------------------------------------------+ | | | Path to CA certificate file used | | ``ssl.ca.location`` | str | to verify the Schema Registry's | | | | private key. | +------------------------------+------+-------------------------------------------------+ | | | Path to client's private key | | | | (PEM) used for authentication. | | ``ssl.key.location`` | str | | | | | ``ssl.certificate.location`` must also be set. | +------------------------------+------+-------------------------------------------------+ | | | Password to use to decrypt the client's private | | | | key. | | | | | | ``ssl.key.password`` | str | The private key may be provided using | | | | ``ssl.key.location``, or bundled with the | | | | certificate in ``ssl.certificate.location``. | | | | Password is optional (key may be unencrypted). | +------------------------------+------+-------------------------------------------------+ | | | Path to client's certificate (PEM) used for | | | | authentication. | | ``ssl.certificate.location`` | str | | | | | May be set without ``ssl.key.location`` if the | | | | private key is stored within the PEM as well. | +------------------------------+------+-------------------------------------------------+ | | | Client HTTP credentials in the form of | | | | ``username:password``. | | ``basic.auth.user.info`` | str | | | | | By default userinfo is extracted from | | | | the URL if present. | +------------------------------+------+-------------------------------------------------+ | | | | | ``proxy`` | str | Proxy such as http://localhost:8030. | | | | | +------------------------------+------+-------------------------------------------------+ | | | | | ``timeout`` | int | Request timeout. | | | | | +------------------------------+------+-------------------------------------------------+ | | | | | ``cache.capacity`` | int | Cache capacity. Defaults to 1000. | | | | | +------------------------------+------+-------------------------------------------------+ | | | | | ``cache.latest.ttl.sec`` | int | TTL in seconds for caching the latest schema. | | | | | +------------------------------+------+-------------------------------------------------+ | | | | | ``max.retries`` | int | Maximum retries for a request. Defaults to 2. | | | | | +------------------------------+------+-------------------------------------------------+ | | | Maximum time to wait for the first retry. | | | | When jitter is applied, the actual wait may | | ``retries.wait.ms`` | int | be less. | | | | | | | | Defaults to 1000. | +------------------------------+------+-------------------------------------------------+ Args: conf (dict): Schema Registry client configuration. See Also: `Confluent Schema Registry documentation <http://confluent.io/docs/current/schema-registry/docs/intro.html>`_ """ # noqa: E501 def __init__(self, conf: dict): self._conf = conf self._rest_client = _RestClient(conf) self._cache = _SchemaCache() cache_capacity = self._rest_client.cache_capacity cache_ttl = self._rest_client.cache_latest_ttl_sec self._latest_version_cache: Cache[Any, Any] self._latest_with_metadata_cache: Cache[Any, Any] if cache_ttl is not None: self._latest_version_cache = TTLCache(cache_capacity, cache_ttl) self._latest_with_metadata_cache = TTLCache(cache_capacity, cache_ttl) else: self._latest_version_cache = LRUCache(cache_capacity) self._latest_with_metadata_cache = LRUCache(cache_capacity) def __enter__(self): return self def __exit__(self, *args): if self._rest_client is not None: self._rest_client.session.close() def config(self): return self._conf
[docs] def register_schema(self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False) -> int: """ Registers a schema under ``subject_name``. Args: subject_name (str): subject to register a schema under schema (Schema): Schema instance to register normalize_schemas (bool): Normalize schema before registering Returns: int: Schema id Raises: SchemaRegistryError: if Schema violates this subject's Compatibility policy or is otherwise invalid. See Also: `POST Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_ """ # noqa: E501 registered_schema = self.register_schema_full_response( subject_name, schema, normalize_schemas=normalize_schemas ) return registered_schema.schema_id # type: ignore[return-value]
[docs] def register_schema_full_response( self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False ) -> 'RegisteredSchema': """ Registers a schema under ``subject_name``. Args: subject_name (str): subject to register a schema under schema (Schema): Schema instance to register normalize_schemas (bool): Normalize schema before registering Returns: int: Schema id Raises: SchemaRegistryError: if Schema violates this subject's Compatibility policy or is otherwise invalid. See Also: `POST Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_ """ # noqa: E501 schema_id = self._cache.get_id_by_schema(subject_name, schema) if schema_id is not None: result = self._cache.get_schema_by_id(subject_name, schema_id) if result is not None: return RegisteredSchema( schema_id=schema_id, guid=result[0], subject=subject_name, version=None, schema=result[1] ) request = schema.to_dict() response = self._rest_client.post( 'subjects/{}/versions?normalize={}'.format(_urlencode(subject_name), normalize_schemas), body=request ) response_schema = RegisteredSchema.from_dict(response) registered_schema = RegisteredSchema( schema_id=response_schema.schema_id, guid=response_schema.guid, subject=response_schema.subject or subject_name, version=response_schema.version, schema=response_schema.schema, ) # The registered schema may not be fully populated s = registered_schema.schema if registered_schema.schema.schema_str is not None else schema self._cache.set_schema(subject_name, registered_schema.schema_id, registered_schema.guid, s) return registered_schema
[docs] def get_schema( self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None, reference_format: Optional[str] = None, ) -> 'Schema': """ Fetches the schema associated with ``schema_id`` from the Schema Registry. The result is cached so subsequent attempts will not require an additional round-trip to the Schema Registry. Args: schema_id (int): Schema id. subject_name (str): Subject name the schema is registered under. fmt (str): Desired output format, dependent on schema type. reference_format (str): Desired output format for references. Returns: Schema: Schema instance identified by the ``schema_id`` Raises: SchemaRegistryError: If schema can't be found. See Also: `GET Schema API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id>`_ """ # noqa: E501 result = self._cache.get_schema_by_id(subject_name, schema_id) if result is not None: return result[1] query = {} if subject_name is not None: query['subject'] = subject_name if fmt is not None: query['format'] = fmt if reference_format is not None: query['reference_format'] = reference_format response = self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) self._cache.set_schema(subject_name, schema_id, registered_schema.guid, registered_schema.schema) return registered_schema.schema
[docs] def get_schema_by_guid(self, guid: str, fmt: Optional[str] = None) -> 'Schema': """ Fetches the schema associated with ``guid`` from the Schema Registry. The result is cached so subsequent attempts will not require an additional round-trip to the Schema Registry. Args: guid (str): Schema guid fmt (str): Format of the schema Returns: Schema: Schema instance identified by the ``guid`` Raises: SchemaRegistryError: If schema can't be found. See Also: `GET Schema API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id>`_ """ # noqa: E501 schema = self._cache.get_schema_by_guid(guid) if schema is not None: return schema query = {} if fmt is not None: query['format'] = fmt response = self._rest_client.get('schemas/guids/{}'.format(guid), query) registered_schema = RegisteredSchema.from_dict(response) self._cache.set_schema(None, registered_schema.schema_id, registered_schema.guid, registered_schema.schema) return registered_schema.schema
[docs] def get_schema_types(self) -> List[str]: """ Lists all supported schema types in the Schema Registry. Returns: list(str): List of supported schema types (e.g., ['AVRO', 'JSON', 'PROTOBUF']) Raises: SchemaRegistryError: if schema types can't be retrieved See Also: `GET Schema Types API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-types>`_ """ # noqa: E501 return self._rest_client.get('schemas/types')
[docs] def get_subjects_by_schema_id( self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, offset: int = 0, limit: int = -1, ) -> List[str]: """ Retrieves all the subjects associated with ``schema_id``. Args: schema_id (int): Schema ID. subject_name (str): Subject name that results can be filtered by. deleted (bool): Whether to include subjects where the schema was deleted. offset (int): Pagination offset for results. limit (int): Pagination size for results. Ignored if negative. Returns: list(str): List of subjects matching the specified parameters. Raises: SchemaRegistryError: if subjects can't be found """ query: dict[str, Any] = {'offset': offset, 'limit': limit} if subject_name is not None: query['subject'] = subject_name if deleted: query['deleted'] = deleted return self._rest_client.get('schemas/ids/{}/subjects'.format(schema_id), query)
[docs] def get_schema_versions( self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, offset: int = 0, limit: int = -1, ) -> List[SchemaVersion]: """ Gets all subject-version pairs of a schema by its ID. Args: schema_id (int): Schema ID. subject_name (str): Subject name that results can be filtered by. deleted (bool): Whether to include subject versions where the schema was deleted. offset (int): Pagination offset for results. limit (int): Pagination size for results. Ignored if negative. Returns: list(SchemaVersion): List of subject-version pairs. Each pair contains: - subject (str): Subject name. - version (int): Version number. Raises: SchemaRegistryError: if schema versions can't be found. See Also: `GET Schema Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id-versions>`_ """ # noqa: E501 query: dict[str, Any] = {'offset': offset, 'limit': limit} if subject_name is not None: query['subject'] = subject_name if deleted: query['deleted'] = deleted response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id), query) return [SchemaVersion.from_dict(item) for item in response]
[docs] def lookup_schema( self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False, ) -> 'RegisteredSchema': """ Returns ``schema`` registration information for ``subject``. Args: subject_name (str): Subject name the schema is registered under. schema (Schema): Schema instance. normalize_schemas (bool): Normalize schema before registering. fmt (str): Desired output format, dependent on schema type. deleted (bool): Whether to include deleted schemas. Returns: RegisteredSchema: Subject registration information for this schema. Raises: SchemaRegistryError: If schema or subject can't be found. See Also: `POST Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)>`_ """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_schema(subject_name, schema) if registered_schema is not None: return registered_schema request = schema.to_dict() query_params: dict[str, Any] = {'normalize': normalize_schemas, 'deleted': deleted} if fmt is not None: query_params['format'] = fmt query_string = '&'.join(f"{key}={value}" for key, value in query_params.items()) response = self._rest_client.post('subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request) result = RegisteredSchema.from_dict(response) # Ensure the schema matches the input registered_schema = RegisteredSchema( schema_id=result.schema_id, guid=result.guid, subject=result.subject, version=result.version, schema=schema, ) self._cache.set_registered_schema(schema, registered_schema) return registered_schema
[docs] def get_subjects( self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1, ) -> List[str]: """ Lists all subjects registered with the Schema Registry. Args: subject_prefix (str): Subject name prefix that results can be filtered by. deleted (bool): Whether to include deleted subjects. deleted_only (bool): Whether to return deleted subjects only. If both deleted and deleted_only are True, deleted_only takes precedence. offset (int): Pagination offset for results. limit (int): Pagination size for results. Ignored if negative. Returns: list(str): Registered subject names Raises: SchemaRegistryError: if subjects can't be found See Also: `GET subjects API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects>`_ """ # noqa: E501 query: dict[str, Any] = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} if subject_prefix is not None: query['subject'] = subject_prefix return self._rest_client.get('subjects', query)
[docs] def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]: """ Deletes the specified subject and its associated compatibility level if registered. It is recommended to use this API only when a topic needs to be recycled or in development environments. Args: subject_name (str): subject name permanent (bool): True for a hard delete, False (default) for a soft delete Returns: list(int): Versions deleted under this subject Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_ """ # noqa: E501 if permanent: versions = self._rest_client.delete('subjects/{}?permanent=true'.format(_urlencode(subject_name))) self._cache.remove_by_subject(subject_name) else: versions = self._rest_client.delete('subjects/{}'.format(_urlencode(subject_name))) return versions
[docs] def get_latest_version(self, subject_name: str, fmt: Optional[str] = None) -> 'RegisteredSchema': """ Retrieves latest registered version for subject Args: subject_name (str): Subject name. fmt (str): Format of the schema Returns: RegisteredSchema: Registration information for this version. Raises: SchemaRegistryError: if the version can't be found or is invalid. See Also: `GET Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_ """ # noqa: E501 registered_schema = self._latest_version_cache.get(subject_name, None) if registered_schema is not None: return registered_schema query = {'format': fmt} if fmt is not None else None response = self._rest_client.get('subjects/{}/versions/{}'.format(_urlencode(subject_name), 'latest'), query) registered_schema = RegisteredSchema.from_dict(response) self._latest_version_cache[subject_name] = registered_schema return registered_schema
[docs] def get_latest_with_metadata( self, subject_name: str, metadata: Dict[str, str], deleted: bool = False, fmt: Optional[str] = None ) -> 'RegisteredSchema': """ Retrieves latest registered version for subject with the given metadata Args: subject_name (str): Subject name. metadata (dict): The key-value pairs for the metadata. deleted (bool): Whether to include deleted schemas. fmt (str): Format of the schema Returns: RegisteredSchema: Registration information for this version. Raises: SchemaRegistryError: if the version can't be found or is invalid. """ # noqa: E501 cache_key = (subject_name, frozenset(metadata.items()), deleted) registered_schema = self._latest_with_metadata_cache.get(cache_key, None) if registered_schema is not None: return registered_schema query: dict[str, Any] = {'deleted': deleted} if fmt is not None: query['format'] = fmt keys = metadata.keys() if keys: query['key'] = [_urlencode(key) for key in keys] query['value'] = [_urlencode(metadata[key]) for key in keys] response = self._rest_client.get('subjects/{}/metadata'.format(_urlencode(subject_name)), query) registered_schema = RegisteredSchema.from_dict(response) self._latest_with_metadata_cache[cache_key] = registered_schema return registered_schema
[docs] def get_version( self, subject_name: str, version: Union[int, Literal["latest"]] = "latest", deleted: bool = False, fmt: Optional[str] = None, ) -> 'RegisteredSchema': """ Retrieves a specific schema registered under `subject_name` and `version`. Args: subject_name (str): Subject name. version (Union[int, Literal["latest"]]): Version of the schema or string "latest". Defaults to latest version. deleted (bool): Whether to include deleted schemas. fmt (str): Format of the schema. Returns: RegisteredSchema: Registration information for this version. Raises: SchemaRegistryError: if the version can't be found or is invalid. See Also: `GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_ """ # noqa: E501 if version != "latest": # Skip cache lookup for reading the latest version registered_schema = self._cache.get_registered_by_subject_version(subject_name, version) if registered_schema is not None: return registered_schema query: dict[str, Any] = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} response = self._rest_client.get('subjects/{}/versions/{}'.format(_urlencode(subject_name), version), query) registered_schema = RegisteredSchema.from_dict(response) self._cache.set_registered_schema(registered_schema.schema, registered_schema) return registered_schema
[docs] def get_referenced_by( self, subject_name: str, version: Union[int, Literal["latest"]] = "latest", offset: int = 0, limit: int = -1 ) -> List[int]: """ Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. Args: subject_name (str): Subject name version (Union[int, Literal["latest"]]): Version number or "latest" offset (int): Pagination offset for results. limit (int): Pagination size for results. Ignored if negative. Returns: list(int): List of schema IDs that reference the specified schema. Raises: SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved See Also: `GET Subject Versions (ReferenceBy) API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-versionId-%20version-referencedby>`_ """ # noqa: E501 query: dict[str, Any] = {'offset': offset, 'limit': limit} return self._rest_client.get( 'subjects/{}/versions/{}/referencedby'.format(_urlencode(subject_name), version), query )
[docs] def get_versions( self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 ) -> List[int]: """ Get a list of all versions registered with this subject. Args: subject_name (str): Subject name. deleted (bool): Whether to include deleted schemas. deleted_only (bool): Whether to return deleted versions only. If both deleted and deleted_only are True, deleted_only takes precedence. offset (int): Pagination offset for results. limit (int): Pagination size for results. Ignored if negative. Returns: list(int): Registered versions Raises: SchemaRegistryError: If subject can't be found See Also: `GET Subject All Versions API Reference <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions>`_ """ # noqa: E501 query: dict[str, Any] = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query)
[docs] def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: """ Deletes a specific version registered to ``subject_name``. Args: subject_name (str) Subject name version (int): Version number permanent (bool): True for a hard delete, False (default) for a soft delete Returns: int: Version number which was deleted Raises: SchemaRegistryError: if the subject or version cannot be found. See Also: `Delete Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)-versions-(versionId-%20version)>`_ """ # noqa: E501 if permanent: response = self._rest_client.delete( 'subjects/{}/versions/{}?permanent=true'.format(_urlencode(subject_name), version) ) else: response = self._rest_client.delete('subjects/{}/versions/{}'.format(_urlencode(subject_name), version)) # Clear cache for both soft and hard deletes to maintain consistency self._cache.remove_by_subject_version(subject_name, version) return response
[docs] def set_compatibility(self, subject_name: Optional[str] = None, level: Optional[str] = None) -> str: """ Update global or subject level compatibility level. Args: level (str): Compatibility level. See API reference for a list of valid values. subject_name (str, optional): Subject to update. Sets global compatibility level policy if not set. Returns: str: The newly configured compatibility level. Raises: SchemaRegistryError: If the compatibility level is invalid. See Also: `PUT Subject Compatibility API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--config-(string-%20subject)>`_ """ # noqa: E501 if level is None: raise ValueError("level must be set") if subject_name is None: return self._rest_client.put('config', body={'compatibility': level.upper()}) return self._rest_client.put( 'config/{}'.format(_urlencode(subject_name)), body={'compatibility': level.upper()} )
[docs] def get_compatibility(self, subject_name: Optional[str] = None) -> str: """ Get the current compatibility level. Args: subject_name (str, optional): Subject name. Returns global policy if left unset. Returns: str: Compatibility level for the subject if set, otherwise the global compatibility level. Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `GET Subject Compatibility API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--config-(string-%20subject)>`_ """ # noqa: E501 if subject_name is not None: url = 'config/{}'.format(_urlencode(subject_name)) else: url = 'config' result = self._rest_client.get(url) return result['compatibilityLevel']
[docs] def test_compatibility( self, subject_name: str, schema: 'Schema', version: Union[int, str] = "latest", normalize: bool = False, verbose: bool = False, ) -> bool: """ Test the compatibility of a candidate schema for a given subject and version Args: subject_name (str): Subject name the schema is registered under schema (Schema): Schema instance. version (int or str, optional): Version number, or the string "latest". Defaults to "latest". normalize (bool): Whether to normalize the input schema. verbose (bool): Whether to return detailed error messages. Returns: bool: True if the schema is compatible with the specified version Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `POST Test Compatibility API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions-(versionId-%20version)>`_ """ # noqa: E501 request = schema.to_dict() response = self._rest_client.post( 'compatibility/subjects/{}/versions/{}?normalize={}&verbose={}'.format( _urlencode(subject_name), version, normalize, verbose ), body=request, ) return response['is_compatible']
[docs] def test_compatibility_all_versions( self, subject_name: str, schema: 'Schema', normalize: bool = False, verbose: bool = False ) -> bool: """ Test the input schema against all schema versions under the subject (depending on the compatibility level set). Args: subject_name (str): Subject of the schema versions against which compatibility is to be tested. schema (Schema): Schema instance. normalize (bool): Whether to normalize the input schema. verbose (bool): Whether to return detailed error messages. Returns: bool: True if the schema is compatible with all of the subject's schemas versions. See Also: `POST Test Compatibility Against All API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions>`_ """ # noqa: E501 request = schema.to_dict() response = self._rest_client.post( 'compatibility/subjects/{}/versions?normalize={}&verbose={}'.format( _urlencode(subject_name), normalize, verbose ), body=request, ) return response['is_compatible']
[docs] def set_config(self, subject_name: Optional[str] = None, config: Optional['ServerConfig'] = None) -> 'ServerConfig': """ Update global or subject config. Args: config (ServerConfig): Config. See API reference for a list of valid values. subject_name (str, optional): Subject to update. Sets global config if not set. Returns: str: The newly configured config. Raises: SchemaRegistryError: If the config is invalid. See Also: `PUT Subject Config API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--config-(string-%20subject)>`_ """ # noqa: E501 if config is None: raise ValueError("config must be set") if subject_name is None: return self._rest_client.put('config', body=config.to_dict()) return self._rest_client.put('config/{}'.format(_urlencode(subject_name)), body=config.to_dict())
[docs] def delete_config(self, subject_name: Optional[str] = None) -> 'ServerConfig': """ Delete the specified subject-level compatibility level config and revert to the global default. Args: subject_name (str, optional): Subject name. Deletes global config if left unset. Returns: ServerConfig: The old deleted config Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `DELETE Subject Config API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--config-(string- subject)>`_ """ # noqa: E501 if subject_name is not None: url = 'config/{}'.format(_urlencode(subject_name)) else: url = 'config' result = self._rest_client.delete(url) return ServerConfig.from_dict(result)
[docs] def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig': """ Get the current config. Args: subject_name (str, optional): Subject name. Returns global config if left unset. Returns: ServerConfig: Config for the subject if set, otherwise the global config. Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `GET Subject Config API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--config-(string-%20subject)>`_ """ # noqa: E501 if subject_name is not None: url = 'config/{}'.format(_urlencode(subject_name)) else: url = 'config' result = self._rest_client.get(url) return ServerConfig.from_dict(result)
[docs] def get_mode(self, subject_name: str) -> str: """ Get the mode for a subject. Args: subject_name (str): Subject name. Returns: str: Mode for the subject. Returns one of IMPORT, READONLY, READWRITE (default). Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `GET Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--mode-(string-%20subject)>`_ """ # noqa: E501 result = self._rest_client.get('mode/{}'.format(_urlencode(subject_name))) return result['mode']
[docs] def update_mode(self, subject_name: str, mode: str, force: bool = False) -> str: """ Update the mode for a subject. Args: subject_name (str): Subject name. mode (str): Mode to update. force (bool): Whether to force a mode change even if the Schema Registry has existing schemas. Returns: str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default). Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `PUT Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--mode-(string-%20subject)>`_ """ # noqa: E501 result = self._rest_client.put( 'mode/{}?force={}'.format(_urlencode(subject_name), force), body={'mode': mode}, ) return result['mode']
[docs] def delete_mode(self, subject_name: str) -> str: """ Delete the mode for a subject and revert to the global default Args: subject_name (str): Subject name. Returns: str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default). Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `DELETE Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--mode-(string-%20subject)>`_ """ # noqa: E501 result = self._rest_client.delete('mode/{}'.format(_urlencode(subject_name))) return result['mode']
[docs] def get_global_mode(self) -> str: """ Get the current mode for Schema Registry at a global level. Returns: str: Schema Registry mode. Must be one of IMPORT, READONLY, READWRITE (default). Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `GET Global Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--mode>`_ """ # noqa: E501 result = self._rest_client.get('mode') return result['mode']
[docs] def update_global_mode(self, mode: str, force: bool = False) -> str: """ Update the mode for the Schema Registry at a global level. Args: mode (str): Mode to update. force (bool): Whether to force a mode change even if the Schema Registry has existing schemas. Returns: str: New mode for the Schema Registry. Must be one of IMPORT, READONLY, READWRITE (default). Raises: SchemaRegistryError: if the request was unsuccessful. See Also: `PUT Global Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--mode>`_ """ # noqa: E501 result = self._rest_client.put('mode?force={}'.format(force), body={'mode': mode}) return result['mode']
[docs] def get_contexts(self, offset: int = 0, limit: int = -1) -> List[str]: """ Retrieves a list of contexts. Args: offset (int): Pagination offset for results. limit (int): Pagination size for results. Ignored if negative. Returns: List[str]: List of contexts. Raises: SchemaRegistryError: if the request was unsuccessful. """ # noqa: E501 result = self._rest_client.get('contexts', query={'offset': offset, 'limit': limit}) return result
def clear_latest_caches(self): self._latest_version_cache.clear() self._latest_with_metadata_cache.clear() def clear_caches(self): self._latest_version_cache.clear() self._latest_with_metadata_cache.clear() self._cache.clear() @staticmethod def new_client(conf: dict) -> 'SchemaRegistryClient': from .mock_schema_registry_client import MockSchemaRegistryClient url = conf.get("url") if url and isinstance(url, str) and url.startswith("mock://"): return MockSchemaRegistryClient(conf) return SchemaRegistryClient(conf)