Skip to content
Snippets Groups Projects
Select Git revision
  • c37f3e36a5752adac6bd9ca9c23a333dc32c2a16
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

populate.py

Blame
  • Jorrit Schaap's avatar
    TMSS-860: specialized method to get latest strategy template
    Jorrit Schaap authored
    c37f3e36
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    populate.py 14.25 KiB
    import logging
    logger = logging.getLogger(__name__)
    
    import json
    from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
    from lofar.common import json_utils
    import os, os.path
    from concurrent.futures import ThreadPoolExecutor
    
    def populate_main():
        from optparse import OptionParser, OptionGroup
    
        # Check the invocation arguments
        parser = OptionParser('%prog [options]', description='upload the templates to TMSS')
    
        group = OptionGroup(parser, 'Populate options')
        parser.add_option_group(group)
        group.add_option('-s', '--schemas', action="store_true", dest="schemas", default=False, help='upload the templates and schemas in the given <schemas_dir> to TMSS, default: %default')
        group.add_option('-d', '--schemas_dir', dest='schemas_dir', type='string',
                          default=os.path.expandvars('$LOFARROOT/share/tmss/schemas'),
                          help='''directory path containing the schemas, default: '%default')''')
        group.add_option('-c', '--connectors', action="store_true", dest="connectors", default=False, help='create the default task_connector_types for all known task_templates in TMSS, default: %default')
        group.add_option('-p', '--permissions', action="store_true", dest="permissions", default=False, help='create the default permissions for TMSS, default: %default')
    
        group = OptionGroup(parser, 'Django options')
        parser.add_option_group(group)
        group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default')
        group.add_option('-R', '--rest_credentials', dest='rest_credentials', type='string', default='TMSSClient', help='django REST API credentials name, default: %default')
    
        (options, args) = parser.parse_args()
    
        if options.schemas:
            # when called from the commandline (like in this case), don't use parallel uploading to make the log lines sequential/readable
            populate_schemas(options.schemas_dir, options.rest_credentials, parallel=False)
    
        if options.connectors or options.permissions:
            # now that the schema's were uploaded, let's populate the dependent task connectors, and the permissions
            from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
            setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
    
            if options.connectors:
                from lofar.sas.tmss.tmss.tmssapp.populate import populate_connectors
                populate_connectors()
    
            if options.permissions:
                from lofar.sas.tmss.tmss.tmssapp.populate import populate_permissions
                populate_permissions()
    
    
    def populate_schemas(schema_dir: str=None, dbcreds_name: str=None, parallel: bool=True):
        with TMSSsession.create_from_dbcreds_for_ldap(dbcreds_name=dbcreds_name) as client:
            DEFAULT_SCHEMA_DIR = os.path.expandvars('$LOFARROOT/share/tmss/schemas')
    
            if schema_dir is None:
                schema_dir = DEFAULT_SCHEMA_DIR
    
            logger.info("Reading templates in: %s", schema_dir)
    
            # get relative paths for all json files in the schema_dir (and subdirs)
            json_file_paths = sum([[os.path.relpath(os.path.join(_root, f), schema_dir) for f in _files if f.endswith('.json')] for _root, _dirs, _files in os.walk(schema_dir)], [])
    
            logger.info("Found %s template files in: %s", len(json_file_paths), schema_dir)
    
            # keep track of the templates, json schemas and references
            schema_templates_dict = {}
            observing_strategy_templates = []
            scheduling_set_strategy_templates = []
            reservation_strategy_templates = []
            schema_references = {}
            all_references = set()
    
            # load all templates and schemas and prepare them for upload.
            # determine the dependencies, and upload the dependents first, and the rest in parallel later.
            for json_file_path in json_file_paths:
                with open(os.path.join(schema_dir, json_file_path)) as schema_file:
                    content = schema_file.read()
                try:
                    json_content = json.loads(content)
                except Exception as e:
                    raise Exception("Could not decode JSON schema file: '%s'\n%s" % (json_file_path, content)) from e
    
                try:
                    # deduct template_type_name, template_name, template_version from json_file_path (which are encoded by convention)
                    json_dir_path, json_file_name = os.path.split(json_file_path)
                    json_dir_path_parts = json_dir_path.split('/')
                    template_type_name = json_dir_path_parts[0]
                    json_file_name_parts = json_file_name[:-5].split('-')
                    template_name = '-'.join(json_file_name_parts[:-1])
                    template_version = json_file_name_parts[-1]
    
                    # gather all info for the template that this json_content is part of
                    template_info = {}
                    template_info['name'] = json_content.get('title', json_content.get('schema', {}).get('title', json_content.get('name', template_name)))
                    template_info['description'] = json_content.get('description', json_content.get('schema', {}).get('description', '<no description>'))
                    template_info['version'] = json_content.get('version', json_content.get('schema', {}).get('version', template_version))
                    template_info['template_type_name'] = template_type_name
                    
                    if 'state' in json_content:
                        template_info['state'] = client.get_full_url_for_path('template_state/' + json_content['state'])
    
                    if 'purpose' in json_content:
                        template_info['purpose'] = client.get_full_url_for_path('template_purpose/' + json_content['purpose'])
    
                    if len(json_dir_path_parts)>1:
                        template_info['type'] = json_dir_path_parts[1]
    
                        # override plain-text type by its url
                        if template_type_name == 'subtask_template':
                            template_info['type'] = client.get_full_url_for_path('subtask_type/' + template_info.get('type'))
                        elif template_type_name == 'task_template':
                            template_info['type'] = client.get_full_url_for_path('task_type/' + template_info.get('type'))
    
                    # make sure that all urls point to the tmss base_url
                    json_content = json_utils.replace_host_in_urls(json_content, new_base_url=client.host_url)
    
                    # get the schema id (if any) without trailing # and/or /
                    json_schema_id = json_content.get('schema',{}).get('$id', "").rstrip("#").rstrip("/")
    
                    if 'strategy_template' in template_type_name:
                        template_info['template'] = json_content['template']
                    else:
                        template_info['schema'] = json_content['schema']
    
                        # inject a unique id in the form of a unique URL to this schema
                        template_info['schema']['$id'] = client.get_full_url_for_path('schemas/%s/%s/%s#' % (template_type_name.replace('_', ''), template_info['name'], template_info['version']))
    
                    # what are the references? on which other schema's does this schema depend?
                    refs = set(ref[:ref.find('#')].rstrip('/') for ref in json_utils.get_refs(json_content) if not ref.startswith(json_schema_id) and ref.startswith("http"))
                    schema_references[json_schema_id] = refs
                    all_references.update(refs)
    
                    # store the prepared template for upload
                    if template_type_name == 'scheduling_unit_observing_strategy_template':
                        template_info['referenced_template'] = json_content['scheduling_unit_template']
                        template_info['referenced_template']['type'] = 'scheduling_unit_template'
                        observing_strategy_templates.append(template_info)
                    elif template_type_name == 'scheduling_set_strategy_template':
                        template_info['referenced_template'] = json_content['scheduling_set_template']
                        template_info['referenced_template']['type'] = 'scheduling_set_template'
                        scheduling_set_strategy_templates.append(template_info)
                    elif template_type_name == 'reservation_strategy_template':
                        template_info['referenced_template'] = json_content['reservation_template']
                        template_info['referenced_template']['type'] = 'reservation_template'
                        reservation_strategy_templates.append(template_info)
                    else:
                        schema_templates_dict[json_schema_id] = template_info
                except Exception as e:
                    raise Exception("Could not gather template_info for file: '%s'\n%s" % (json_file_path, e)) from e
    
            # helper functions for uploading
            def upload_template(template: dict):
                try:
                    template_type_name = template.pop('template_type_name')
                    try:
                        known_template = client._get_schema_template(template_type_name=template_type_name, name=template['name'], version=template['version'])
                        if known_template['schema'] == template['schema']:
                            logger.info("Skipping template with name='%s' version='%s' because it is already known and the contents did not change: url='%s'",
                                        template['name'], template['version'], known_template['url'])
                        else:
                            logger.info("Uploading template with name='%s' version='%s' for existing template with same name/version. This may result in this template getting an auto-incremented version number when the template was already used.", template['name'], template['version'])
                            client.put_template(id=known_template['id'], template_path=template_type_name, **template)
                    except ValueError:
                        logger.info("Uploading template with name='%s' version='%s'", template['name'], template['version'])
                        client.post_template(template_path=template_type_name, **template)
                except Exception as e:
                    logger.error("Error while uploading template with name='%s' version='%s': %s",
                                 template['name'], template['version'], e)
    
            # helper functions for uploading
            def upload_template_if_needed_with_dependents_first(id: str):
                if id in schema_templates_dict:
                    #recurse over dependents if any
                    refs = schema_references.get(id, [])
                    for ref in refs:
                        upload_template_if_needed_with_dependents_first(ref)
    
                    template = schema_templates_dict.pop(id)
                    upload_template(template)
    
            def upload_strategy_templates(template: dict):
                """
                Helper function for uploading strategy_templates
                Use template["strategy_template_name"] for the name of the 'strategy_template' to be uploaded
                Use template["template_name"] for the name of the template (used for validation)
                """
                try:
                    # lookup the url for the referenced template...
                    referenced_template = template.pop('referenced_template')
                    response_templates = client.get_path_as_json_object(referenced_template['type']+'?name=' + referenced_template['name'] + '&version=' + str(referenced_template['version']))
    
                    if len(response_templates) != 1:
                        logger.info("Skipping strategy template with name='%s' version='%s' it references an unknown template with name='%s' version='%s'",
                                    template['name'], template['version'], referenced_template['name'], referenced_template['version'])
                        return
    
                    # ... and inject it in the to-be-uploaded template
                    template[referenced_template['type']] = response_templates[0]['url']
    
                    strategy_template_type = template.pop('template_type_name')
                    try:
                        known_template = client._get_schema_template(template_type_name=strategy_template_type, name=template['name'], version=template['version'])
                        if known_template['template'] == template['template']:
                            logger.info("Skipping strategy template with name='%s' version='%s' because it is already known and the contents did not change: url='%s'", template['name'], template['version'], known_template['url'])
                        else:
                            logger.info("Uploading strategy with type='%s' name='%s' version='%s' for existing template with same name/version. This may result in this template getting an auto-incremented version number when the template was already used.",
                                        strategy_template_type, template['name'], template['version'])
                            client.put_template(id=known_template['id'], template_path=strategy_template_type, **template)
                    except ValueError:
                        logger.info("Uploading strategy with type='%s' name='%s' version='%s'", strategy_template_type, template['name'], template['version'])
                        client.post_template(template_path=strategy_template_type, **template)
                except Exception as e:
                    logger.error("Could not upload strategy with name='%s' version='%s' error: %s", template['name'], template['version'], e)
    
            # first, upload all dependent templates
            for ref in all_references:
                upload_template_if_needed_with_dependents_first(ref)
    
            # then, upload the remaining templates in parallel
            rest_templates = [template for template in schema_templates_dict.values()]
            with ThreadPoolExecutor(max_workers=None if parallel else 1) as executor:
                executor.map(upload_template, rest_templates)
    
            # and finally, the strategy_templates
            for strategy_templates in (reservation_strategy_templates,
                                       observing_strategy_templates,
                                       scheduling_set_strategy_templates):
                with ThreadPoolExecutor(max_workers=None if parallel else 1) as executor:
                    executor.map(upload_strategy_templates, strategy_templates)
    
    if __name__=='__main__':
        populate_main()