# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. import json import time import jsonschema from copy import deepcopy import requests from datetime import datetime, timedelta from .util import single_line_with_single_spaces class JSONError(Exception): pass DEFAULT_MAX_SCHEMA_CACHE_AGE = timedelta(minutes=1) def _extend_with_default(validator_class): """ Extend the properties validation so that it adds missing properties with their default values (where one is defined in the schema). traverse down and add enclosed properties. see: <https://python-jsonschema.readthedocs.io/en/stable/faq/#why-doesn-t-my-schema-s-default-property-set-the-default-on-my-instance> """ validate_properties = validator_class.VALIDATORS["properties"] def set_defaults(validator, properties, instance, schema): for property, subschema in properties.items(): if "default" in subschema: instance.setdefault(property, subschema["default"]) elif "type" not in subschema: # could be anything, probably a $ref. pass elif subschema["type"] == "object": # giving objects the {} default causes that default to be populated by the properties of the object instance.setdefault(property, {}) elif subschema["type"] == "array": # giving arrays the [] default causes that default to be populated by the items of the array instance.setdefault(property, []) for error in validate_properties( validator, properties, instance, schema, ): yield error return jsonschema.validators.extend( validator_class, {"properties" : set_defaults}, ) def _extend_with_required(validator_class): """ Extend the required properties validation so that it adds missing required properties with their default values, (where one is defined in the schema). (Note: the check for required properties happens before property validation, so this is required even though the override in _extend_with_default would as well add the property.) see: <https://python-jsonschema.readthedocs.io/en/stable/faq/#why-doesn-t-my-schema-s-default-property-set-the-default-on-my-instance> """ validate_required = validator_class.VALIDATORS["required"] def set_required_properties(validator, properties, instance, schema): for property in properties: subschema = schema['properties'].get(property, {}) if "default" in subschema: instance.setdefault(property, subschema["default"]) for error in validate_required( validator, properties, instance, schema, ): yield error return jsonschema.validators.extend( validator_class, {"required" : set_required_properties}, ) # define a custom validator that fills in properties before validation _DefaultValidatingDraft6Validator = _extend_with_default(jsonschema.Draft6Validator) _DefaultValidatingDraft6Validator = _extend_with_required(_DefaultValidatingDraft6Validator) # storage for validators, for fast caching of ref resolved urls. _schema_validators = {} _schema__defaults_addding_validators = {} def get_validator_for_schema(schema: dict, add_defaults: bool=False): '''get a json validator for the given schema. If the schema is already known in the cache by its $id, then the validator from the cached is return. This saves many many lookups and ref resolving. the 'add_defaults' parameter indicates if we want the validator to add defaults while validating or not.''' if isinstance(schema, str): schema = json.loads(schema) validators_cache = _schema__defaults_addding_validators if add_defaults else _schema_validators if '$id' in schema: if schema['$id'] not in validators_cache: validators_cache[schema['$id']] = _DefaultValidatingDraft6Validator(schema) if add_defaults else jsonschema.Draft6Validator(schema=schema) validator = validators_cache[schema['$id']] else: validator = _DefaultValidatingDraft6Validator(schema) if add_defaults else jsonschema.Draft6Validator(schema=schema) validator.schema = schema return validator def get_default_json_object_for_schema(schema: str) -> dict: '''return a valid json object for the given schema with all properties with their default values''' return add_defaults_to_json_object_for_schema({}, schema) def add_defaults_to_json_object_for_schema(json_object: dict, schema: str, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE) -> dict: '''return a copy of the json object with defaults filled in according to the schema for all the missing properties''' copy_of_json_object = deepcopy(json_object) # add a $schema to the json doc if needed if '$schema' not in copy_of_json_object and '$id' in schema: copy_of_json_object['$schema'] = schema['$id'] # resolve $refs to fill in defaults for those, too schema = resolved_remote_refs(schema, cache=cache, max_cache_age=max_cache_age) # run validator, which populates the properties with defaults. get_validator_for_schema(schema, add_defaults=True).validate(copy_of_json_object) return copy_of_json_object def replace_host_in_urls(schema, new_base_url: str, keys=['$id', '$ref', '$schema']): '''return the given schema with all fields in the given keys which start with the given old_base_url updated so they point to the given new_base_url''' if isinstance(schema, dict): updated_schema = {} for key, value in schema.items(): if key in keys: if isinstance(value,str) and (value.startswith('http://') or value.startswith('https://')) and 'json-schema.org' not in value: try: # deconstruct path from old url head, anchor, tail = value.partition('#') host, slash, path = head.lstrip('http://').lstrip('https://').partition('/') # and reconstruct the proper new url updated_schema[key] = (new_base_url.rstrip('/') + '/' + path + anchor + tail.rstrip('/')).replace(' ', '%20') except: # just accept the original value and assume that the user uploaded a proper schema updated_schema[key] = value else: updated_schema[key] = value else: updated_schema[key] = replace_host_in_urls(value, new_base_url, keys) return updated_schema if isinstance(schema, list): return [replace_host_in_urls(item, new_base_url, keys) for item in schema] return schema def get_sub_schema(schema: dict, reference: str, default=None): '''resolve a JSON reference (f.e. /definitions/foo) in the schema and return the corresponding subschema.''' parts = reference.lstrip('#').strip('/').split('/') if parts == ['']: # reference to root return schema try: subschema = schema for part in parts: subschema = subschema[part] return subschema except KeyError as e: return default def write_at_path(schema: dict, path: str, value): '''write the given value (list/dict/plain) at the given (nested) path (f.e. #/definitions/foo/bar) in the schema.''' path_parts = path.lstrip('#').strip('/').split('/') nested_schema = schema for i, path_part in enumerate(path_parts): if path_part not in nested_schema: if i < len(path_parts) - 1: nested_schema[path_part] = {} else: nested_schema[path_part] = value nested_schema = nested_schema[path_part] def replace_local_refs(schema: dict, path: str): '''''' if isinstance(schema, list): # recurse over each item in the list for item in schema: replace_local_refs(item, path) if isinstance(schema, dict): for key, value in list(schema.items()): if key == '$ref' and isinstance(value, str) and not value.startswith('http'): current_ref_parts = value.lstrip('#').strip('/').split('/') path_parts = path.lstrip('#').strip('/').split('/') new_ref_parts = [] for i, (current_part, path_part) in enumerate(zip(current_ref_parts, path_parts)): if current_part==path_part: new_ref_parts.append(path_part) else: new_ref_parts.extend(path_parts[i:]) new_ref_parts.extend(current_ref_parts[i:]) break new_ref_value = '#/'+'/'.join(new_ref_parts) schema[key] = new_ref_value # recurse over each value in the dict replace_local_refs(value, path) def _fetch_url(url: str) -> str: '''try to obtain the provided URL.''' # try to fetch the url a few times (jsonschema.org is down quite often, but only for a brief moment) for attempt_nr in range(5): try: response = requests.get(url) if response.status_code == 200: return response.text except requests.exceptions.RequestException as e: time.sleep(2) # retry after a little sleep raise JSONError("Could not get: %s" % (url,)) def _get_referenced_definition(ref_url, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE): '''fetch the schema given by the remote ref_url, and return a tuple of the now-local-ref, and the definition sub-schema''' referenced_schema = _get_referenced_schema(ref_url, cache=cache, max_cache_age=max_cache_age) # deduct referred schema name and version from ref-value head, anchor, tail = ref_url.partition('#') # extract the definition sub-schema definition = get_sub_schema(referenced_schema, tail) return tail, definition def _get_referenced_schema(ref_url, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE): '''fetch the schema given by the ref_url, and return it''' # deduct referred schema name and version from ref-value head, anchor, tail = ref_url.partition('#') def _fech_url_and_update_cache_entry_if_needed(): referenced_schema = json.loads(_fetch_url(ref_url)) if isinstance(cache, dict): cache[head] = referenced_schema, datetime.utcnow() return referenced_schema if isinstance(cache, dict) and head in cache: # use cached value referenced_schema, last_update_timestamp = cache[head] # refresh cache if outdated if datetime.utcnow() - last_update_timestamp > max_cache_age: referenced_schema = _fech_url_and_update_cache_entry_if_needed() else: # fetch url, and store in cache referenced_schema = _fech_url_and_update_cache_entry_if_needed() return referenced_schema def resolved_remote_refs(schema, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE): '''return the given schema with all remote $ref fields (to http://my.server.com/my/schema/#/definitions/...) replaced by the local $ref pointers to #/definitions/...''' if cache is None: cache = {} # make a copy, which can be updated with resolved refs, and then be returned copy_of_schema = deepcopy(schema) # make sure we have a 'definitions' section if 'definitions' not in copy_of_schema: copy_of_schema['definitions'] = {} # helper function to resolve the refs of the given sub_schema. # adds the resoled refs to the root_schema_definitions def _recursive_resolved_remote_refs(sub_schema): if isinstance(sub_schema, list): # recurse over each item in the list return [_recursive_resolved_remote_refs(item) for item in sub_schema] if isinstance(sub_schema, dict): for key in list(sub_schema.keys()): # if the key is a remote ref, # then fetch the definition # and change it into a local definition and ref # and store it in the root_schema_definitions if key=="$ref" and isinstance(sub_schema['$ref'], str) and sub_schema['$ref'].startswith('http'): # this is a truly remote reference to another schema # resolve remote reference ref_full_url = sub_schema['$ref'] # deduct and construct a replacement local_ref for the ref_url schema_url, anchor, local_ref = ref_full_url.partition('#') schema_url = schema_url.rstrip('/') local_ref = '#/'+local_ref.lstrip('/') # replace remote ref by new local_ref sub_schema['$ref'] = local_ref current_definition = get_sub_schema(copy_of_schema, local_ref, None) # fetch the remote schema... referenced_schema = _get_referenced_schema(schema_url, cache=cache, max_cache_age=max_cache_age) # recurse, thus resolving the remote refs in the referenced schema referenced_schema = resolved_remote_refs(referenced_schema, cache, max_cache_age) resolved_definition = get_sub_schema(referenced_schema, local_ref, None) if current_definition is not None and current_definition != resolved_definition: msg = "ambiguity while resolving remote references in schema $id='%s' $ref='%s' definition1='%s' definition2='%s'" % (schema.get('$id', '<no_id>'), local_ref, single_line_with_single_spaces(current_definition), single_line_with_single_spaces(resolved_definition)) raise JSONError(msg) write_at_path(copy_of_schema, local_ref, resolved_definition) for ref in get_refs(referenced_schema): resolved_definition = get_sub_schema(referenced_schema, ref, None) write_at_path(copy_of_schema, ref, resolved_definition) else: # key is not a (remote) $ref, # just copy a recursively resolved key/value into the sub_schema sub_schema[key] = _recursive_resolved_remote_refs(sub_schema[key]) # sub_schema is not a list or dict, so no need to resolve anything, just return it. return sub_schema # use the recursive helper method to replace the remote refs _recursive_resolved_remote_refs(copy_of_schema) return copy_of_schema def resolved_local_refs(schema, root_schema: dict=None): '''return the given schema with all local $ref fields (to #/definitions/...) replaced by the referred definition that they point to.''' if root_schema is None: root_schema = schema if isinstance(schema, dict): updated_schema = {} keys = list(schema.keys()) if "$ref" in keys and isinstance(schema['$ref'], str): ref = schema['$ref'] if ref.startswith('#/'): # resolve local reference, a-la "#/definitions/foo" updated_schema = get_sub_schema(root_schema, ref[1:]) keys.remove("$ref") for key in keys: updated_schema[key] = resolved_local_refs(schema[key], root_schema=root_schema) return updated_schema if isinstance(schema, list): return [resolved_local_refs(item, root_schema=root_schema) for item in schema] return schema def get_refs(schema) -> set: '''return a set of all $refs in the schema''' refs = set() if isinstance(schema, dict): for key, value in schema.items(): if key == "$ref" and isinstance(value, str): refs.add(value) else: refs.update(get_refs(value)) if isinstance(schema, list): for item in schema: refs.update(get_refs(item)) return refs def validate_json_against_its_schema(json_object: dict, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE): '''validate the give json object against its own schema (the URI/URL that its propery $schema points to)''' schema_url = json_object['$schema'] referenced_schema = _get_referenced_schema(schema_url, cache=cache, max_cache_age=max_cache_age) return validate_json_against_schema(json_object, referenced_schema, cache=cache, max_cache_age=max_cache_age) def validate_json_against_schema(json_string: str, schema: str, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE): '''validate the given json_string against the given schema. If no exception if thrown, then the given json_string validates against the given schema. :raises SchemaValidationException if the json_string does not validate against the schema ''' # ensure the given arguments are strings if type(json_string) != str: json_string = json.dumps(json_string) if type(schema) != str: schema = json.dumps(schema) # ensure the specification and schema are both valid json in the first place try: json_object = json.loads(json_string) except json.decoder.JSONDecodeError as e: raise jsonschema.exceptions.ValidationError("Invalid JSON: %s\n%s" % (str(e), json_string)) try: schema_object = json.loads(schema) except json.decoder.JSONDecodeError as e: raise jsonschema.exceptions.ValidationError("Invalid JSON: %s\n%s" % (str(e), schema)) # resolve $refs to fill in defaults for those, too schema_object = resolved_remote_refs(schema_object, cache=cache, max_cache_age=max_cache_age) # now do the actual validation try: validate_json_object_with_schema(json_object, schema_object) except jsonschema.ValidationError as e: raise jsonschema.exceptions.ValidationError(str(e)) def get_default_json_object_for_schema(schema: str, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE) -> dict: """ TMSS wrapper for TMSS 'add_defaults_to_json_object_for_schema' :param schema: :return: json_object with default values of the schema """ data = add_defaults_to_json_object_for_schema({}, schema, cache=cache, max_cache_age=max_cache_age) if '$id' in schema: data['$schema'] = schema['$id'] return data def raise_on_self_refs(schema: dict): '''raise if the given schema contains any (remote/http) reference to itself''' id = schema.get('$id','') if id.startswith('http'): # remove any trailing slashes or hashes id = id.rstrip('/').rstrip('#').rstrip('/') for ref in get_refs(schema): if ref.startswith(id): raise JSONError("schema $id='%s' contains a $ref to itself: '%s'" %(id, ref)) def validate_json_object_with_schema(json_object, schema): """ Validate the given json_object with schema """ get_validator_for_schema(schema, add_defaults=False).validate(json_object)