Select Git revision
populate.py

TMSS-860: specialized method to get latest strategy template
Jorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
populate.py 14.25 KiB
import logging
logger = logging.getLogger(__name__)
import json
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
from lofar.common import json_utils
import os, os.path
from concurrent.futures import ThreadPoolExecutor
def populate_main():
from optparse import OptionParser, OptionGroup
# Check the invocation arguments
parser = OptionParser('%prog [options]', description='upload the templates to TMSS')
group = OptionGroup(parser, 'Populate options')
parser.add_option_group(group)
group.add_option('-s', '--schemas', action="store_true", dest="schemas", default=False, help='upload the templates and schemas in the given <schemas_dir> to TMSS, default: %default')
group.add_option('-d', '--schemas_dir', dest='schemas_dir', type='string',
default=os.path.expandvars('$LOFARROOT/share/tmss/schemas'),
help='''directory path containing the schemas, default: '%default')''')
group.add_option('-c', '--connectors', action="store_true", dest="connectors", default=False, help='create the default task_connector_types for all known task_templates in TMSS, default: %default')
group.add_option('-p', '--permissions', action="store_true", dest="permissions", default=False, help='create the default permissions for TMSS, default: %default')
group = OptionGroup(parser, 'Django options')
parser.add_option_group(group)
group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default')
group.add_option('-R', '--rest_credentials', dest='rest_credentials', type='string', default='TMSSClient', help='django REST API credentials name, default: %default')
(options, args) = parser.parse_args()
if options.schemas:
# when called from the commandline (like in this case), don't use parallel uploading to make the log lines sequential/readable
populate_schemas(options.schemas_dir, options.rest_credentials, parallel=False)
if options.connectors or options.permissions:
# now that the schema's were uploaded, let's populate the dependent task connectors, and the permissions
from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
if options.connectors:
from lofar.sas.tmss.tmss.tmssapp.populate import populate_connectors
populate_connectors()
if options.permissions:
from lofar.sas.tmss.tmss.tmssapp.populate import populate_permissions
populate_permissions()
def populate_schemas(schema_dir: str=None, dbcreds_name: str=None, parallel: bool=True):
with TMSSsession.create_from_dbcreds_for_ldap(dbcreds_name=dbcreds_name) as client:
DEFAULT_SCHEMA_DIR = os.path.expandvars('$LOFARROOT/share/tmss/schemas')
if schema_dir is None:
schema_dir = DEFAULT_SCHEMA_DIR
logger.info("Reading templates in: %s", schema_dir)
# get relative paths for all json files in the schema_dir (and subdirs)
json_file_paths = sum([[os.path.relpath(os.path.join(_root, f), schema_dir) for f in _files if f.endswith('.json')] for _root, _dirs, _files in os.walk(schema_dir)], [])
logger.info("Found %s template files in: %s", len(json_file_paths), schema_dir)
# keep track of the templates, json schemas and references
schema_templates_dict = {}
observing_strategy_templates = []
scheduling_set_strategy_templates = []
reservation_strategy_templates = []
schema_references = {}
all_references = set()
# load all templates and schemas and prepare them for upload.
# determine the dependencies, and upload the dependents first, and the rest in parallel later.
for json_file_path in json_file_paths:
with open(os.path.join(schema_dir, json_file_path)) as schema_file:
content = schema_file.read()
try:
json_content = json.loads(content)
except Exception as e:
raise Exception("Could not decode JSON schema file: '%s'\n%s" % (json_file_path, content)) from e
try:
# deduct template_type_name, template_name, template_version from json_file_path (which are encoded by convention)
json_dir_path, json_file_name = os.path.split(json_file_path)
json_dir_path_parts = json_dir_path.split('/')
template_type_name = json_dir_path_parts[0]
json_file_name_parts = json_file_name[:-5].split('-')
template_name = '-'.join(json_file_name_parts[:-1])
template_version = json_file_name_parts[-1]
# gather all info for the template that this json_content is part of
template_info = {}
template_info['name'] = json_content.get('title', json_content.get('schema', {}).get('title', json_content.get('name', template_name)))
template_info['description'] = json_content.get('description', json_content.get('schema', {}).get('description', '<no description>'))
template_info['version'] = json_content.get('version', json_content.get('schema', {}).get('version', template_version))
template_info['template_type_name'] = template_type_name
if 'state' in json_content:
template_info['state'] = client.get_full_url_for_path('template_state/' + json_content['state'])
if 'purpose' in json_content:
template_info['purpose'] = client.get_full_url_for_path('template_purpose/' + json_content['purpose'])
if len(json_dir_path_parts)>1:
template_info['type'] = json_dir_path_parts[1]
# override plain-text type by its url
if template_type_name == 'subtask_template':
template_info['type'] = client.get_full_url_for_path('subtask_type/' + template_info.get('type'))
elif template_type_name == 'task_template':
template_info['type'] = client.get_full_url_for_path('task_type/' + template_info.get('type'))
# make sure that all urls point to the tmss base_url
json_content = json_utils.replace_host_in_urls(json_content, new_base_url=client.host_url)
# get the schema id (if any) without trailing # and/or /
json_schema_id = json_content.get('schema',{}).get('$id', "").rstrip("#").rstrip("/")
if 'strategy_template' in template_type_name:
template_info['template'] = json_content['template']
else:
template_info['schema'] = json_content['schema']
# inject a unique id in the form of a unique URL to this schema
template_info['schema']['$id'] = client.get_full_url_for_path('schemas/%s/%s/%s#' % (template_type_name.replace('_', ''), template_info['name'], template_info['version']))
# what are the references? on which other schema's does this schema depend?
refs = set(ref[:ref.find('#')].rstrip('/') for ref in json_utils.get_refs(json_content) if not ref.startswith(json_schema_id) and ref.startswith("http"))
schema_references[json_schema_id] = refs
all_references.update(refs)
# store the prepared template for upload
if template_type_name == 'scheduling_unit_observing_strategy_template':
template_info['referenced_template'] = json_content['scheduling_unit_template']
template_info['referenced_template']['type'] = 'scheduling_unit_template'
observing_strategy_templates.append(template_info)
elif template_type_name == 'scheduling_set_strategy_template':
template_info['referenced_template'] = json_content['scheduling_set_template']
template_info['referenced_template']['type'] = 'scheduling_set_template'
scheduling_set_strategy_templates.append(template_info)
elif template_type_name == 'reservation_strategy_template':
template_info['referenced_template'] = json_content['reservation_template']
template_info['referenced_template']['type'] = 'reservation_template'
reservation_strategy_templates.append(template_info)
else:
schema_templates_dict[json_schema_id] = template_info
except Exception as e:
raise Exception("Could not gather template_info for file: '%s'\n%s" % (json_file_path, e)) from e
# helper functions for uploading
def upload_template(template: dict):
try:
template_type_name = template.pop('template_type_name')
try:
known_template = client._get_schema_template(template_type_name=template_type_name, name=template['name'], version=template['version'])
if known_template['schema'] == template['schema']:
logger.info("Skipping template with name='%s' version='%s' because it is already known and the contents did not change: url='%s'",
template['name'], template['version'], known_template['url'])
else:
logger.info("Uploading template with name='%s' version='%s' for existing template with same name/version. This may result in this template getting an auto-incremented version number when the template was already used.", template['name'], template['version'])
client.put_template(id=known_template['id'], template_path=template_type_name, **template)
except ValueError:
logger.info("Uploading template with name='%s' version='%s'", template['name'], template['version'])
client.post_template(template_path=template_type_name, **template)
except Exception as e:
logger.error("Error while uploading template with name='%s' version='%s': %s",
template['name'], template['version'], e)
# helper functions for uploading
def upload_template_if_needed_with_dependents_first(id: str):
if id in schema_templates_dict:
#recurse over dependents if any
refs = schema_references.get(id, [])
for ref in refs:
upload_template_if_needed_with_dependents_first(ref)
template = schema_templates_dict.pop(id)
upload_template(template)
def upload_strategy_templates(template: dict):
"""
Helper function for uploading strategy_templates
Use template["strategy_template_name"] for the name of the 'strategy_template' to be uploaded
Use template["template_name"] for the name of the template (used for validation)
"""
try:
# lookup the url for the referenced template...
referenced_template = template.pop('referenced_template')
response_templates = client.get_path_as_json_object(referenced_template['type']+'?name=' + referenced_template['name'] + '&version=' + str(referenced_template['version']))
if len(response_templates) != 1:
logger.info("Skipping strategy template with name='%s' version='%s' it references an unknown template with name='%s' version='%s'",
template['name'], template['version'], referenced_template['name'], referenced_template['version'])
return
# ... and inject it in the to-be-uploaded template
template[referenced_template['type']] = response_templates[0]['url']
strategy_template_type = template.pop('template_type_name')
try:
known_template = client._get_schema_template(template_type_name=strategy_template_type, name=template['name'], version=template['version'])
if known_template['template'] == template['template']:
logger.info("Skipping strategy template with name='%s' version='%s' because it is already known and the contents did not change: url='%s'", template['name'], template['version'], known_template['url'])
else:
logger.info("Uploading strategy with type='%s' name='%s' version='%s' for existing template with same name/version. This may result in this template getting an auto-incremented version number when the template was already used.",
strategy_template_type, template['name'], template['version'])
client.put_template(id=known_template['id'], template_path=strategy_template_type, **template)
except ValueError:
logger.info("Uploading strategy with type='%s' name='%s' version='%s'", strategy_template_type, template['name'], template['version'])
client.post_template(template_path=strategy_template_type, **template)
except Exception as e:
logger.error("Could not upload strategy with name='%s' version='%s' error: %s", template['name'], template['version'], e)
# first, upload all dependent templates
for ref in all_references:
upload_template_if_needed_with_dependents_first(ref)
# then, upload the remaining templates in parallel
rest_templates = [template for template in schema_templates_dict.values()]
with ThreadPoolExecutor(max_workers=None if parallel else 1) as executor:
executor.map(upload_template, rest_templates)
# and finally, the strategy_templates
for strategy_templates in (reservation_strategy_templates,
observing_strategy_templates,
scheduling_set_strategy_templates):
with ThreadPoolExecutor(max_workers=None if parallel else 1) as executor:
executor.map(upload_strategy_templates, strategy_templates)
if __name__=='__main__':
populate_main()