-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
json_utils.py 19.65 KiB
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import json
import time
import jsonschema
from copy import deepcopy
import requests
from datetime import datetime, timedelta
from .util import single_line_with_single_spaces
class JSONError(Exception):
pass
DEFAULT_MAX_SCHEMA_CACHE_AGE = timedelta(minutes=1)
def _extend_with_default(validator_class):
"""
Extend the properties validation so that it adds missing properties with their default values (where one is defined
in the schema).
traverse down and add enclosed properties.
see: <https://python-jsonschema.readthedocs.io/en/stable/faq/#why-doesn-t-my-schema-s-default-property-set-the-default-on-my-instance>
"""
validate_properties = validator_class.VALIDATORS["properties"]
def set_defaults(validator, properties, instance, schema):
for property, subschema in properties.items():
if "default" in subschema:
instance.setdefault(property, subschema["default"])
elif "type" not in subschema:
# could be anything, probably a $ref.
pass
elif subschema["type"] == "object":
# giving objects the {} default causes that default to be populated by the properties of the object
instance.setdefault(property, {})
elif subschema["type"] == "array":
# giving arrays the [] default causes that default to be populated by the items of the array
instance.setdefault(property, [])
for error in validate_properties(
validator, properties, instance, schema,
):
yield error
return jsonschema.validators.extend(
validator_class, {"properties" : set_defaults},
)
def _extend_with_required(validator_class):
"""
Extend the required properties validation so that it adds missing required properties with their default values,
(where one is defined in the schema).
(Note: the check for required properties happens before property validation, so this is required even though the
override in _extend_with_default would as well add the property.)
see: <https://python-jsonschema.readthedocs.io/en/stable/faq/#why-doesn-t-my-schema-s-default-property-set-the-default-on-my-instance>
"""
validate_required = validator_class.VALIDATORS["required"]
def set_required_properties(validator, properties, instance, schema):
for property in properties:
subschema = schema['properties'].get(property, {})
if "default" in subschema:
instance.setdefault(property, subschema["default"])
for error in validate_required(
validator, properties, instance, schema,
):
yield error
return jsonschema.validators.extend(
validator_class, {"required" : set_required_properties},
)
# define a custom validator that fills in properties before validation
_DefaultValidatingDraft6Validator = _extend_with_default(jsonschema.Draft6Validator)
_DefaultValidatingDraft6Validator = _extend_with_required(_DefaultValidatingDraft6Validator)
# storage for validators, for fast caching of ref resolved urls.
_schema_validators = {}
_schema__defaults_addding_validators = {}
def get_validator_for_schema(schema: dict, add_defaults: bool=False):
'''get a json validator for the given schema.
If the schema is already known in the cache by its $id, then the validator from the cached is return.
This saves many many lookups and ref resolving.
the 'add_defaults' parameter indicates if we want the validator to add defaults while validating or not.'''
if isinstance(schema, str):
schema = json.loads(schema)
validators_cache = _schema__defaults_addding_validators if add_defaults else _schema_validators
if '$id' in schema:
if schema['$id'] not in validators_cache:
validators_cache[schema['$id']] = _DefaultValidatingDraft6Validator(schema) if add_defaults else jsonschema.Draft6Validator(schema=schema)
validator = validators_cache[schema['$id']]
else:
validator = _DefaultValidatingDraft6Validator(schema) if add_defaults else jsonschema.Draft6Validator(schema=schema)
validator.schema = schema
return validator
def get_default_json_object_for_schema(schema: str) -> dict:
'''return a valid json object for the given schema with all properties with their default values'''
return add_defaults_to_json_object_for_schema({}, schema)
def add_defaults_to_json_object_for_schema(json_object: dict, schema: str, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE) -> dict:
'''return a copy of the json object with defaults filled in according to the schema for all the missing properties'''
copy_of_json_object = deepcopy(json_object)
# add a $schema to the json doc if needed
if '$schema' not in copy_of_json_object and '$id' in schema:
copy_of_json_object['$schema'] = schema['$id']
# resolve $refs to fill in defaults for those, too
schema = resolved_remote_refs(schema, cache=cache, max_cache_age=max_cache_age)
# run validator, which populates the properties with defaults.
get_validator_for_schema(schema, add_defaults=True).validate(copy_of_json_object)
return copy_of_json_object
def replace_host_in_urls(schema, new_base_url: str, keys=['$id', '$ref', '$schema']):
'''return the given schema with all fields in the given keys which start with the given old_base_url updated so they point to the given new_base_url'''
if isinstance(schema, dict):
updated_schema = {}
for key, value in schema.items():
if key in keys:
if isinstance(value,str) and (value.startswith('http://') or value.startswith('https://')) and 'json-schema.org' not in value:
try:
# deconstruct path from old url
head, anchor, tail = value.partition('#')
host, slash, path = head.lstrip('http://').lstrip('https://').partition('/')
# and reconstruct the proper new url
updated_schema[key] = (new_base_url.rstrip('/') + '/' + path + anchor + tail.rstrip('/')).replace(' ', '%20')
except:
# just accept the original value and assume that the user uploaded a proper schema
updated_schema[key] = value
else:
updated_schema[key] = value
else:
updated_schema[key] = replace_host_in_urls(value, new_base_url, keys)
return updated_schema
if isinstance(schema, list):
return [replace_host_in_urls(item, new_base_url, keys) for item in schema]
return schema
def get_sub_schema(schema: dict, reference: str, default=None):
'''resolve a JSON reference (f.e. /definitions/foo) in the schema and return the corresponding subschema.'''
parts = reference.lstrip('#').strip('/').split('/')
if parts == ['']:
# reference to root
return schema
try:
subschema = schema
for part in parts:
subschema = subschema[part]
return subschema
except KeyError as e:
return default
def write_at_path(schema: dict, path: str, value):
'''write the given value (list/dict/plain) at the given (nested) path (f.e. #/definitions/foo/bar) in the schema.'''
path_parts = path.lstrip('#').strip('/').split('/')
nested_schema = schema
for i, path_part in enumerate(path_parts):
if path_part not in nested_schema:
if i < len(path_parts) - 1:
nested_schema[path_part] = {}
else:
nested_schema[path_part] = value
nested_schema = nested_schema[path_part]
def replace_local_refs(schema: dict, path: str):
''''''
if isinstance(schema, list):
# recurse over each item in the list
for item in schema:
replace_local_refs(item, path)
if isinstance(schema, dict):
for key, value in list(schema.items()):
if key == '$ref' and isinstance(value, str) and not value.startswith('http'):
current_ref_parts = value.lstrip('#').strip('/').split('/')
path_parts = path.lstrip('#').strip('/').split('/')
new_ref_parts = []
for i, (current_part, path_part) in enumerate(zip(current_ref_parts, path_parts)):
if current_part==path_part:
new_ref_parts.append(path_part)
else:
new_ref_parts.extend(path_parts[i:])
new_ref_parts.extend(current_ref_parts[i:])
break
new_ref_value = '#/'+'/'.join(new_ref_parts)
schema[key] = new_ref_value
# recurse over each value in the dict
replace_local_refs(value, path)
def _fetch_url(url: str) -> str:
'''try to obtain the provided URL.'''
# try to fetch the url a few times (jsonschema.org is down quite often, but only for a brief moment)
for attempt_nr in range(5):
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
except requests.exceptions.RequestException as e:
time.sleep(2) # retry after a little sleep
raise JSONError("Could not get: %s" % (url,))
def _get_referenced_definition(ref_url, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE):
'''fetch the schema given by the remote ref_url, and return a tuple of the now-local-ref, and the definition sub-schema'''
referenced_schema = _get_referenced_schema(ref_url, cache=cache, max_cache_age=max_cache_age)
# deduct referred schema name and version from ref-value
head, anchor, tail = ref_url.partition('#')
# extract the definition sub-schema
definition = get_sub_schema(referenced_schema, tail)
return tail, definition
def _get_referenced_schema(ref_url, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE):
'''fetch the schema given by the ref_url, and return it'''
# deduct referred schema name and version from ref-value
head, anchor, tail = ref_url.partition('#')
def _fech_url_and_update_cache_entry_if_needed():
referenced_schema = json.loads(_fetch_url(ref_url))
if isinstance(cache, dict):
cache[head] = referenced_schema, datetime.utcnow()
return referenced_schema
if isinstance(cache, dict) and head in cache:
# use cached value
referenced_schema, last_update_timestamp = cache[head]
# refresh cache if outdated
if datetime.utcnow() - last_update_timestamp > max_cache_age:
referenced_schema = _fech_url_and_update_cache_entry_if_needed()
else:
# fetch url, and store in cache
referenced_schema = _fech_url_and_update_cache_entry_if_needed()
return referenced_schema
def resolved_remote_refs(schema, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE):
'''return the given schema with all remote $ref fields (to http://my.server.com/my/schema/#/definitions/...) replaced by the local $ref pointers to #/definitions/...'''
if cache is None:
cache = {}
# make a copy, which can be updated with resolved refs, and then be returned
copy_of_schema = deepcopy(schema)
# make sure we have a 'definitions' section
if 'definitions' not in copy_of_schema:
copy_of_schema['definitions'] = {}
# helper function to resolve the refs of the given sub_schema.
# adds the resoled refs to the root_schema_definitions
def _recursive_resolved_remote_refs(sub_schema):
if isinstance(sub_schema, list):
# recurse over each item in the list
return [_recursive_resolved_remote_refs(item) for item in sub_schema]
if isinstance(sub_schema, dict):
for key in list(sub_schema.keys()):
# if the key is a remote ref,
# then fetch the definition
# and change it into a local definition and ref
# and store it in the root_schema_definitions
if key=="$ref" and isinstance(sub_schema['$ref'], str) and sub_schema['$ref'].startswith('http'):
# this is a truly remote reference to another schema
# resolve remote reference
ref_full_url = sub_schema['$ref']
# deduct and construct a replacement local_ref for the ref_url
schema_url, anchor, local_ref = ref_full_url.partition('#')
schema_url = schema_url.rstrip('/')
local_ref = '#/'+local_ref.lstrip('/')
# replace remote ref by new local_ref
sub_schema['$ref'] = local_ref
current_definition = get_sub_schema(copy_of_schema, local_ref, None)
# fetch the remote schema...
referenced_schema = _get_referenced_schema(schema_url, cache=cache, max_cache_age=max_cache_age)
# recurse, thus resolving the remote refs in the referenced schema
referenced_schema = resolved_remote_refs(referenced_schema, cache, max_cache_age)
resolved_definition = get_sub_schema(referenced_schema, local_ref, None)
if current_definition is not None and current_definition != resolved_definition:
msg = "ambiguity while resolving remote references in schema $id='%s' $ref='%s' definition1='%s' definition2='%s'" % (schema.get('$id', '<no_id>'), local_ref, single_line_with_single_spaces(current_definition), single_line_with_single_spaces(resolved_definition))
raise JSONError(msg)
write_at_path(copy_of_schema, local_ref, resolved_definition)
for ref in get_refs(referenced_schema):
resolved_definition = get_sub_schema(referenced_schema, ref, None)
write_at_path(copy_of_schema, ref, resolved_definition)
else:
# key is not a (remote) $ref,
# just copy a recursively resolved key/value into the sub_schema
sub_schema[key] = _recursive_resolved_remote_refs(sub_schema[key])
# sub_schema is not a list or dict, so no need to resolve anything, just return it.
return sub_schema
# use the recursive helper method to replace the remote refs
_recursive_resolved_remote_refs(copy_of_schema)
return copy_of_schema
def resolved_local_refs(schema, root_schema: dict=None):
'''return the given schema with all local $ref fields (to #/definitions/...) replaced by the referred definition that they point to.'''
if root_schema is None:
root_schema = schema
if isinstance(schema, dict):
updated_schema = {}
keys = list(schema.keys())
if "$ref" in keys and isinstance(schema['$ref'], str):
ref = schema['$ref']
if ref.startswith('#/'):
# resolve local reference, a-la "#/definitions/foo"
updated_schema = get_sub_schema(root_schema, ref[1:])
keys.remove("$ref")
for key in keys:
updated_schema[key] = resolved_local_refs(schema[key], root_schema=root_schema)
return updated_schema
if isinstance(schema, list):
return [resolved_local_refs(item, root_schema=root_schema) for item in schema]
return schema
def get_refs(schema) -> set:
'''return a set of all $refs in the schema'''
refs = set()
if isinstance(schema, dict):
for key, value in schema.items():
if key == "$ref" and isinstance(value, str):
refs.add(value)
else:
refs.update(get_refs(value))
if isinstance(schema, list):
for item in schema:
refs.update(get_refs(item))
return refs
def validate_json_against_its_schema(json_object: dict, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE):
'''validate the give json object against its own schema (the URI/URL that its propery $schema points to)'''
schema_url = json_object['$schema']
referenced_schema = _get_referenced_schema(schema_url, cache=cache, max_cache_age=max_cache_age)
return validate_json_against_schema(json_object, referenced_schema, cache=cache, max_cache_age=max_cache_age)
def validate_json_against_schema(json_string: str, schema: str, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE):
'''validate the given json_string against the given schema.
If no exception if thrown, then the given json_string validates against the given schema.
:raises SchemaValidationException if the json_string does not validate against the schema
'''
# ensure the given arguments are strings
if type(json_string) != str:
json_string = json.dumps(json_string)
if type(schema) != str:
schema = json.dumps(schema)
# ensure the specification and schema are both valid json in the first place
try:
json_object = json.loads(json_string)
except json.decoder.JSONDecodeError as e:
raise jsonschema.exceptions.ValidationError("Invalid JSON: %s\n%s" % (str(e), json_string))
try:
schema_object = json.loads(schema)
except json.decoder.JSONDecodeError as e:
raise jsonschema.exceptions.ValidationError("Invalid JSON: %s\n%s" % (str(e), schema))
# resolve $refs to fill in defaults for those, too
schema_object = resolved_remote_refs(schema_object, cache=cache, max_cache_age=max_cache_age)
# now do the actual validation
try:
validate_json_object_with_schema(json_object, schema_object)
except jsonschema.ValidationError as e:
raise jsonschema.exceptions.ValidationError(str(e))
def get_default_json_object_for_schema(schema: str, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE) -> dict:
"""
TMSS wrapper for TMSS 'add_defaults_to_json_object_for_schema'
:param schema:
:return: json_object with default values of the schema
"""
data = add_defaults_to_json_object_for_schema({}, schema, cache=cache, max_cache_age=max_cache_age)
if '$id' in schema:
data['$schema'] = schema['$id']
return data
def raise_on_self_refs(schema: dict):
'''raise if the given schema contains any (remote/http) reference to itself'''
id = schema.get('$id','')
if id.startswith('http'):
# remove any trailing slashes or hashes
id = id.rstrip('/').rstrip('#').rstrip('/')
for ref in get_refs(schema):
if ref.startswith(id):
raise JSONError("schema $id='%s' contains a $ref to itself: '%s'" %(id, ref))
def validate_json_object_with_schema(json_object, schema):
"""
Validate the given json_object with schema
"""
get_validator_for_schema(schema, add_defaults=False).validate(json_object)