-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
validation.py 5.84 KiB
import json
import jsonschema
from jsonschema import Draft6Validator, RefResolver
import os
from lofar.sas.tmss.tmss.exceptions import *
from lofar.common.json_utils import add_defaults_to_json_object_for_schema as json_utils_add_defaults_to_json
import requests
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
TMSS_SCHEMA_PATH = BASE_DIR + "/tmssapp/schemas/"
TMSS_COMMON_BASE_SCHEMA = "base.json"
# TODO: validation is not the correct name any more rename to schema.py or schema_utils.py ?
def validate_json_against_its_schema(json_object: dict):
'''validate the give json object against its own schema (the URI/URL that its propery $schema points to)'''
schema = json_object['$schema']
return validate_json_against_schema(json_object, requests.get(schema).text)
def validate_json_against_schema(json_string: str, schema: str):
'''validate the given json_string against the given schema.
If no exception if thrown, then the given json_string validates against the given schema.
:raises SchemaValidationException if the json_string does not validate against the schema
'''
# ensure the given arguments are strings
if type(json_string) != str:
json_string = json.dumps(json_string)
if type(schema) != str:
schema = json.dumps(schema)
# ensure the specification and schema are both valid json in the first place
try:
json_object = json.loads(json_string)
except json.decoder.JSONDecodeError as e:
raise SchemaValidationException("Invalid JSON: %s\n%s" % (str(e), json_string))
try:
schema_object = json.loads(schema)
except json.decoder.JSONDecodeError as e:
raise SchemaValidationException("Invalid JSON: %s\n%s" % (str(e), schema))
# now do the actual validation
try:
validate_json_object_with_schema(json_object, schema_object)
except jsonschema.ValidationError as e:
raise SchemaValidationException(str(e))
def get_default_json_object_for_schema(schema: str) -> dict:
"""
TMSS wrapper for TMSS 'add_defaults_to_json_object_for_schema'
:param schema:
:return: json_object with default values of the schema
"""
return add_defaults_to_json_object_for_schema({}, schema)
def add_defaults_to_json_object_for_schema(json_object: dict, schema: str) -> dict:
"""
TMSS wrapper for 'json_utils.add_defaults_to_json_object_for_schema' to use RefResolver when needed
:param schema:
:return: json_object with default values of the schema added to the given json_object
"""
if check_ref_in_schema(schema):
resolver = create_local_ref_resolver()
copy_of_json_object = json_utils_add_defaults_to_json(json_object, schema, resolver)
else:
copy_of_json_object = json_utils_add_defaults_to_json(json_object, schema)
return copy_of_json_object
def validate_json_object_with_schema(json_object, schema):
"""
Validate the given json_object with schema
When required use RefResolver
:param json_object:
:param schema:
:return:
"""
if check_ref_in_schema(schema):
resolver = create_local_ref_resolver()
Draft6Validator(schema=schema, resolver=resolver).validate(json_object)
else:
Draft6Validator(schema=schema).validate(json_object)
def item_generator(json_input, lookup_key):
"""
A function to find a key in nested json structure
:param json_input:
:param lookup_key:
:return: values of the key found in json
"""
if isinstance(json_input, dict):
for k, v in json_input.items():
if k == lookup_key:
yield v
else:
yield from item_generator(v, lookup_key)
elif isinstance(json_input, list):
for item in json_input:
yield from item_generator(item, lookup_key)
def check_ref_in_schema(schema):
"""
Check if there is a reference to the base schema (base.json) for the given schema
:param schema:
:return: True or False
"""
ref_to_base = False
for ref in list(item_generator(schema, "$ref")):
if TMSS_COMMON_BASE_SCHEMA in ref:
ref_to_base = True
return ref_to_base
def create_local_ref_resolver(abs_path_to_schema=TMSS_SCHEMA_PATH, base_schema_file_name=TMSS_COMMON_BASE_SCHEMA):
"""
Create jsonschema.RefResolver class for all schemas located in given path.
Apparently if you using jsonschema you have to set a reference resolver to referred schemas
for local files:
https://stackoverflow.com/questions/25145160/json-schema-ref-does-not-work-for-relative-path
https://stackoverflow.com/questions/53968770/how-to-set-up-local-file-references-in-python-jsonschema-document
https://stackoverflow.com/questions/42159346/jsonschema-refresolver-to-resolve-multiple-refs-in-python
:param abs_path_to_schema: Absolute path to location of all schemas
:param base_schema_file_name: The name of the common base schema where other schema's refer too
:return resolver_object: RefResolver object
"""
schema_store = {}
with open(os.path.join(abs_path_to_schema, base_schema_file_name), 'r') as fp:
def_schema = json.load(fp)
file_name_list = os.listdir(abs_path_to_schema)
for file_name in file_name_list:
file_full_path = os.path.join(abs_path_to_schema, file_name)
if file_full_path.endswith(".json"):
with open(file_full_path, "r") as fp:
schema = json.load(fp)
if "$id" in schema:
schema_store[schema["$id"]] = schema
# wondering if schema_store is still required ... ??
# https://stackoverflow.com/questions/42159346/jsonschema-refresolver-to-resolve-multiple-refs-in-python
base_uri = "file:///" + abs_path_to_schema + base_schema_file_name
resolver = RefResolver(base_uri=base_uri, referrer=def_schema, store=schema_store)
return resolver