#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging
import urllib
from collections import defaultdict
from threading import Lock
from requests import (Session,
utils)
from .error import SchemaRegistryError
# TODO: consider adding `six` dependency or employing a compat file
# Python 2.7 is officially EOL so compatibility issue will be come more the norm.
# We need a better way to handle these issues.
# Six is one possibility but the compat file pattern used by requests
# is also quite nice.
#
# six: https://pypi.org/project/six/
# compat file : https://github.com/psf/requests/blob/master/requests/compat.py
try:
string_type = basestring # noqa
def _urlencode(value):
return urllib.quote(value, safe='')
except NameError:
string_type = str
def _urlencode(value):
return urllib.parse.quote(value, safe='')
log = logging.getLogger(__name__)
VALID_AUTH_PROVIDERS = ['URL', 'USER_INFO']
class _RestClient(object):
"""
HTTP client for Confluent Schema Registry.
See SchemaRegistryClient for configuration details.
Args:
conf (dict): Dictionary containing _RestClient configuration
"""
def __init__(self, conf):
self.session = Session()
# copy dict to avoid mutating the original
conf_copy = conf.copy()
base_url = conf_copy.pop('url', None)
if base_url is None:
raise ValueError("Missing required configuration property url")
if not isinstance(base_url, string_type):
raise TypeError("url must be an instance of str, not "
+ str(type(base_url)))
if not base_url.startswith('http'):
raise ValueError("Invalid url {}".format(base_url))
self.base_url = base_url.rstrip('/')
# The following configs map Requests Session class properties.
# See the API docs for specifics.
# https://requests.readthedocs.io/en/master/api/#request-sessions
ca = conf_copy.pop('ssl.ca.location', None)
if ca is not None:
self.session.verify = ca
key = conf_copy.pop('ssl.key.location', None)
cert = conf_copy.pop('ssl.certificate.location', None)
if cert is not None and key is not None:
self.session.cert = (cert, key)
if cert is not None and key is None:
self.session.cert = cert
if key is not None and cert is None:
raise ValueError("ssl.certificate.location required when"
" configuring ssl.key.location")
userinfo = utils.get_auth_from_url(base_url)
if 'basic.auth.user.info' in conf_copy:
if userinfo != ('', ''):
raise ValueError("basic.auth.user.info configured with"
" userinfo credentials in the URL."
" Remove userinfo credentials from the url or"
" remove basic.auth.user.info from the"
" configuration")
userinfo = tuple(conf_copy.pop('basic.auth.user.info', '').split(':'))
if len(userinfo) != 2:
raise ValueError("basic.auth.user.info must be in the form"
" of {username}:{password}")
self.session.auth = userinfo if userinfo != ('', '') else None
# Any leftover keys are unknown to _RestClient
if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))
def _close(self):
self.session.close()
def get(self, url, query=None):
return self.send_request(url, method='GET', query=query)
def post(self, url, body, **kwargs):
return self.send_request(url, method='POST', body=body)
def delete(self, url):
return self.send_request(url, method='DELETE')
def put(self, url, body=None):
return self.send_request(url, method='PUT', body=body)
def send_request(self, url, method, body=None, query=None):
"""
Sends HTTP request to the SchemaRegistry.
All unsuccessful attempts will raise a SchemaRegistryError with the
response contents. In most cases this will be accompanied with a
Schema Registry supplied error code.
In the event the response is malformed an error_code of -1 will be used.
Args:
url (str): Request path
method (str): HTTP method
body (str): Request content
query (dict): Query params to attach to the URL
Returns:
dict: Schema Registry response content.
"""
headers = {'Accept': "application/vnd.schemaregistry.v1+json,"
" application/vnd.schemaregistry+json,"
" application/json"}
if body is not None:
body = json.dumps(body)
headers = {'Content-Length': str(len(body)),
'Content-Type': "application/vnd.schemaregistry.v1+json"}
response = self.session.request(
method, url="/".join([self.base_url, url]),
headers=headers, data=body, params=query)
try:
if 200 <= response.status_code <= 299:
return response.json()
raise SchemaRegistryError(response.status_code,
response.json().get('error_code'),
response.json().get('message'))
# Schema Registry may return malformed output when it hits unexpected errors
except (ValueError, KeyError, AttributeError):
raise SchemaRegistryError(response.status_code,
-1,
"Unknown Schema Registry Error: "
+ str(response.content))
class _SchemaCache(object):
"""
Thread-safe cache for use with the Schema Registry Client.
This cache may be used to retrieve schema ids, schemas or to check
known subject membership.
"""
def __init__(self):
self.lock = Lock()
self.schema_id_index = {}
self.schema_index = {}
self.subject_schemas = defaultdict(set)
def set(self, schema_id, schema, subject_name=None):
"""
Add a Schema identified by schema_id to the cache.
Args:
schema_id (int): Schema's registration id
schema (Schema): Schema instance
subject_name(str): Optional, subject schema is registered under
Returns:
int: The schema_id
"""
with self.lock:
self.schema_id_index[schema_id] = schema
self.schema_index[schema] = schema_id
if subject_name is not None:
self.subject_schemas[subject_name].add(schema)
def get_schema(self, schema_id):
"""
Get the schema instance associated with schema_id from the cache.
Args:
schema_id (int): Id used to identify a schema
Returns:
Schema: The schema if known; else None
"""
return self.schema_id_index.get(schema_id, None)
def get_schema_id_by_subject(self, subject, schema):
"""
Get the schema_id associated with this schema registered under subject.
Args:
subject (str): The subject this schema is associated with
schema (Schema): The schema associated with this schema_id
Returns:
int: Schema ID if known; else None
"""
with self.lock:
if schema in self.subject_schemas[subject]:
return self.schema_index.get(schema, None)
[docs]class SchemaRegistryClient(object):
"""
A Confluent Schema Registry client.
Configuration properties (* indicates a required field):
+------------------------------+------+-------------------------------------------------+
| Property name | type | Description |
+==============================+======+=================================================+
| ``url`` * | str | Schema Registry URL. |
+------------------------------+------+-------------------------------------------------+
| | | Path to CA certificate file used |
| ``ssl.ca.location`` | str | to verify the Schema Registry's |
| | | private key. |
+------------------------------+------+-------------------------------------------------+
| | | Path to client's private key |
| | | (PEM) used for authentication. |
| ``ssl.key.location`` | str | |
| | | ``ssl.certificate.location`` must also be set. |
+------------------------------+------+-------------------------------------------------+
| | | Path to client's public key (PEM) used for |
| | | authentication. |
| ``ssl.certificate.location`` | str | |
| | | May be set without ssl.key.location if the |
| | | private key is stored within the PEM as well. |
+------------------------------+------+-------------------------------------------------+
| | | Client HTTP credentials in the form of |
| | | ``username:password``. |
| ``basic.auth.user.info`` | str | |
| | | By default userinfo is extracted from |
| | | the URL if present. |
+------------------------------+------+-------------------------------------------------+
Args:
conf (dict): Schema Registry client configuration.
See Also:
`Confluent Schema Registry documentation <http://confluent.io/docs/current/schema-registry/docs/intro.html>`_
""" # noqa: E501
def __init__(self, conf):
self._rest_client = _RestClient(conf)
self._cache = _SchemaCache()
def __enter__(self):
return self
def __exit__(self, *args):
if self._rest_client is not None:
self._rest_client._close()
[docs] def register_schema(self, subject_name, schema, normalize_schemas=False):
"""
Registers a schema under ``subject_name``.
Args:
subject_name (str): subject to register a schema under
schema (Schema): Schema instance to register
Returns:
int: Schema id
Raises:
SchemaRegistryError: if Schema violates this subject's
Compatibility policy or is otherwise invalid.
See Also:
`POST Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
""" # noqa: E501
schema_id = self._cache.get_schema_id_by_subject(subject_name, schema)
if schema_id is not None:
return schema_id
request = {'schema': schema.schema_str}
# CP 5.5 adds new fields (for JSON and Protobuf).
if len(schema.references) > 0 or schema.schema_type != 'AVRO':
request['schemaType'] = schema.schema_type
request['references'] = [{'name': ref.name,
'subject': ref.subject,
'version': ref.version}
for ref in schema.references]
response = self._rest_client.post(
'subjects/{}/versions?normalize={}'.format(_urlencode(subject_name), normalize_schemas),
body=request)
schema_id = response['id']
self._cache.set(schema_id, schema, subject_name)
return schema_id
[docs] def get_schema(self, schema_id):
"""
Fetches the schema associated with ``schema_id`` from the
Schema Registry. The result is cached so subsequent attempts will not
require an additional round-trip to the Schema Registry.
Args:
schema_id (int): Schema id
Returns:
Schema: Schema instance identified by the ``schema_id``
Raises:
SchemaRegistryError: If schema can't be found.
See Also:
`GET Schema API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id>`_
""" # noqa: E501
schema = self._cache.get_schema(schema_id)
if schema is not None:
return schema
response = self._rest_client.get('schemas/ids/{}'.format(schema_id))
schema = Schema(schema_str=response['schema'],
schema_type=response.get('schemaType', 'AVRO'))
schema.references = [
SchemaReference(name=ref['name'], subject=ref['subject'], version=ref['version'])
for ref in response.get('references', [])
]
self._cache.set(schema_id, schema)
return schema
[docs] def lookup_schema(self, subject_name, schema, normalize_schemas=False):
"""
Returns ``schema`` registration information for ``subject``.
Args:
subject_name (str): Subject name the schema is registered under
schema (Schema): Schema instance.
Returns:
RegisteredSchema: Subject registration information for this schema.
Raises:
SchemaRegistryError: If schema or subject can't be found
See Also:
`POST Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
""" # noqa: E501
request = {'schema': schema.schema_str}
# CP 5.5 adds new fields (for JSON and Protobuf).
if len(schema.references) > 0 or schema.schema_type != 'AVRO':
request['schemaType'] = schema.schema_type
request['references'] = [{'name': ref.name,
'subject': ref.subject,
'version': ref.version}
for ref in schema.references]
response = self._rest_client.post('subjects/{}?normalize={}'
.format(_urlencode(subject_name), normalize_schemas),
body=request)
schema_type = response.get('schemaType', 'AVRO')
return RegisteredSchema(schema_id=response['id'],
schema=Schema(response['schema'],
schema_type,
[
SchemaReference(name=ref['name'],
subject=ref['subject'],
version=ref['version'])
for ref in response.get('references', [])
]),
subject=response['subject'],
version=response['version'])
[docs] def get_subjects(self):
"""
List all subjects registered with the Schema Registry
Returns:
list(str): Registered subject names
Raises:
SchemaRegistryError: if subjects can't be found
See Also:
`GET subjects API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions>`_
""" # noqa: E501
return self._rest_client.get('subjects')
[docs] def delete_subject(self, subject_name, permanent=False):
"""
Deletes the specified subject and its associated compatibility level if
registered. It is recommended to use this API only when a topic needs
to be recycled or in development environments.
Args:
subject_name (str): subject name
permanent (bool): True for a hard delete, False (default) for a soft delete
Returns:
list(int): Versions deleted under this subject
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_
""" # noqa: E501
list = self._rest_client.delete('subjects/{}'
.format(_urlencode(subject_name)))
if permanent:
self._rest_client.delete('subjects/{}?permanent=true'
.format(_urlencode(subject_name)))
return list
[docs] def get_latest_version(self, subject_name):
"""
Retrieves latest registered version for subject
Args:
subject_name (str): Subject name.
Returns:
RegisteredSchema: Registration information for this version.
Raises:
SchemaRegistryError: if the version can't be found or is invalid.
See Also:
`GET Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
""" # noqa: E501
response = self._rest_client.get('subjects/{}/versions/{}'
.format(_urlencode(subject_name),
'latest'))
schema_type = response.get('schemaType', 'AVRO')
return RegisteredSchema(schema_id=response['id'],
schema=Schema(response['schema'],
schema_type,
[
SchemaReference(name=ref['name'],
subject=ref['subject'],
version=ref['version'])
for ref in response.get('references', [])
]),
subject=response['subject'],
version=response['version'])
[docs] def get_version(self, subject_name, version):
"""
Retrieves a specific schema registered under ``subject_name``.
Args:
subject_name (str): Subject name.
version (int): version number. Defaults to latest version.
Returns:
RegisteredSchema: Registration information for this version.
Raises:
SchemaRegistryError: if the version can't be found or is invalid.
See Also:
`GET Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
""" # noqa: E501
response = self._rest_client.get('subjects/{}/versions/{}'
.format(_urlencode(subject_name),
version))
schema_type = response.get('schemaType', 'AVRO')
return RegisteredSchema(schema_id=response['id'],
schema=Schema(response['schema'],
schema_type,
[
SchemaReference(name=ref['name'],
subject=ref['subject'],
version=ref['version'])
for ref in response.get('references', [])
]),
subject=response['subject'],
version=response['version'])
[docs] def get_versions(self, subject_name):
"""
Get a list of all versions registered with this subject.
Args:
subject_name (str): Subject name.
Returns:
list(int): Registered versions
Raises:
SchemaRegistryError: If subject can't be found
See Also:
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
""" # noqa: E501
return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)))
[docs] def delete_version(self, subject_name, version):
"""
Deletes a specific version registered to ``subject_name``.
Args:
subject_name (str) Subject name
version (int): Version number
Returns:
int: Version number which was deleted
Raises:
SchemaRegistryError: if the subject or version cannot be found.
See Also:
`Delete Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
""" # noqa: E501
response = self._rest_client.delete('subjects/{}/versions/{}'.
format(_urlencode(subject_name),
version))
return response
[docs] def set_compatibility(self, subject_name=None, level=None):
"""
Update global or subject level compatibility level.
Args:
level (str): Compatibility level. See API reference for a list of
valid values.
subject_name (str, optional): Subject to update. Sets compatibility
level policy if not set.
Returns:
str: The newly configured compatibility level.
Raises:
SchemaRegistryError: If the compatibility level is invalid.
See Also:
`PUT Subject Compatibility API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--config-(string-%20subject)>`_
""" # noqa: E501
if level is None:
raise ValueError("level must be set")
if subject_name is None:
return self._rest_client.put('config',
body={'compatibility': level.upper()})
return self._rest_client.put('config/{}'
.format(_urlencode(subject_name)),
body={'compatibility': level.upper()})
[docs] def get_compatibility(self, subject_name=None):
"""
Get the current compatibility level.
Args:
subject_name (str, optional): Subject name. Returns global policy
if left unset.
Returns:
str: Compatibility level for the subject if set, otherwise the global compatibility level.
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`GET Subject Compatibility API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--config-(string-%20subject)>`_
""" # noqa: E501
if subject_name is not None:
url = 'config/{}'.format(_urlencode(subject_name))
else:
url = 'config'
result = self._rest_client.get(url)
return result['compatibilityLevel']
[docs] def test_compatibility(self, subject_name, schema, version="latest"):
"""Test the compatibility of a candidate schema for a given subject and version
Args:
subject_name (str): Subject name the schema is registered under
schema (Schema): Schema instance.
version (int or str, optional): Version number, or the string "latest". Defaults to "latest".
Returns:
bool: True if the schema is compatible with the specified version
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`POST Test Compatibility API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions-(versionId-%20version)>`_
""" # noqa: E501
request = {"schema": schema.schema_str}
if schema.schema_type != "AVRO":
request['schemaType'] = schema.schema_type
if schema.references:
request['references'] = [
{'name': ref.name, 'subject': ref.subject, 'version': ref.version}
for ref in schema.references
]
response = self._rest_client.post(
'compatibility/subjects/{}/versions/{}'.format(subject_name, version), body=request
)
return response['is_compatible']
[docs]class Schema(object):
"""
An unregistered Schema.
Args:
schema_str (str): String representation of the schema.
schema_type (str): The schema type: AVRO, PROTOBUF or JSON.
references ([SchemaReference]): SchemaReferences used in this schema.
"""
__slots__ = ['schema_str', 'schema_type', 'references', '_hash']
def __init__(self, schema_str, schema_type, references=[]):
super(Schema, self).__init__()
self.schema_str = schema_str
self.schema_type = schema_type
self.references = references
self._hash = hash(schema_str)
def __eq__(self, other):
return all([self.schema_str == other.schema_str,
self.schema_type == other.schema_type])
def __hash__(self):
return self._hash
[docs]class RegisteredSchema(object):
"""
Schema registration information.
Represents a Schema registered with a subject. Use this class when you need
a specific version of a subject such as forming a SchemaReference.
Args:
schema_id (int): Registered Schema id
schema (Schema): Registered Schema
subject (str): Subject this schema is registered under
version (int): Version of this subject this schema is registered to
"""
def __init__(self, schema_id, schema, subject, version):
self.schema_id = schema_id
self.schema = schema
self.subject = subject
self.version = version
class SchemaReference(object):
"""
Reference to a Schema registered with the Schema Registry.
As of Confluent Platform 5.5 Schema's may now hold references to other
registered schemas. As long as there is a references to a schema it may not
be deleted.
Args:
name (str): Schema name
subject (str): Subject this Schema is registered with
version (int): This Schema's version
"""
def __init__(self, name, subject, version):
super(SchemaReference, self).__init__()
self.name = name
self.subject = subject
self.version = version