diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/common.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/common.py index 75b834097b410f744a568b33c67b800ebc82a369..496865f127a8cfa0accff1656be83c307abb2500 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/common.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/common.py @@ -154,6 +154,10 @@ class Template(NamedVersionedCommon): '''get a json document object (dict) which complies with this template's schema and with all the defaults filled in.''' return get_default_json_object_for_schema(self.schema, cache=TemplateSchemaMixin._schema_cache, max_cache_age=TemplateSchemaMixin._MAX_SCHEMA_CACHE_AGE) + def add_defaults_to_json_object_for_schema(self, json_object: dict) -> dict: + '''add any missing default propery to the given json document object (dict).''' + return add_defaults_to_json_object_for_schema(json_object, self.schema, cache=TemplateSchemaMixin._schema_cache, max_cache_age=TemplateSchemaMixin._MAX_SCHEMA_CACHE_AGE) + def validate_and_annotate_schema(self): '''validate this template's schema, check for the required properties '$id', '$schema', 'title', 'description', and annotate this schema with the template's name, description and version.''' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 718a8a1ecaf3233ce1db01345be5bae360fa8ce3..d392fdc87f65bf49815e3ecf80a6e3aac27cff83 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -12,7 +12,6 @@ from os.path import splitext from lofar.common.datetimeutils import formatDatetime, round_to_second_precision from lofar.common import isProductionEnvironment -from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema from lofar.common.lcu_utils import get_current_stations from lofar.stationmodel.antennafields import antenna_fields @@ -20,7 +19,6 @@ from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSche from datetime import datetime, timedelta from lofar.common.datetimeutils import parseDatetime -from lofar.common.json_utils import add_defaults_to_json_object_for_schema from lofar.sas.tmss.tmss.tmssapp.models import * from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC @@ -33,9 +31,6 @@ from lofar.sas.tmss.tmss.tmssapp.conversions import antennafields_for_antennaset from lofar.sas.tmss.tmss.exceptions import TMSSException from django.db import transaction -# cache for json schema's -_schema_cache = {} - # ==== various create* methods to convert/create a TaskBlueprint into one or more Subtasks ==== def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bool: @@ -183,7 +178,8 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta # start with an observation subtask specification with all the defaults and the right structure according to the schema subtask_template = SubtaskTemplate.objects.get(name='observation control') - subtask_spec = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) + subtask_spec = subtask_template.get_default_json_document_for_schema() + # wipe the default pointings, these should come from the task_spec subtask_spec['stations'].pop('analog_pointing', None) @@ -540,7 +536,7 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) # step 1: create subtask in defining state, with filled-in subtask_template qafile_subtask_template = SubtaskTemplate.objects.get(name="QA file conversion") - qafile_subtask_spec = add_defaults_to_json_object_for_schema({}, qafile_subtask_template.schema, cache=_schema_cache) + qafile_subtask_spec = qafile_subtask_template.get_default_json_document_for_schema() qafile_subtask_spec['nr_of_subbands'] = obs_task_qafile_spec.get("nr_of_subbands") qafile_subtask_spec['nr_of_timestamps'] = obs_task_qafile_spec.get("nr_of_timestamps") validate_json_against_schema(qafile_subtask_spec, qafile_subtask_template.schema) @@ -557,7 +553,7 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") - selection_doc = get_default_json_object_for_schema(selection_template.schema, cache=_schema_cache) + selection_doc = selection_template.get_default_json_document_for_schema() for obs_out in observation_subtask.outputs.all(): qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask, @@ -618,7 +614,7 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta # step 1: create subtask in defining state, with filled-in subtask_template qaplots_subtask_template = SubtaskTemplate.objects.get(name="QA plots") - qaplots_subtask_spec_doc = add_defaults_to_json_object_for_schema({}, qaplots_subtask_template.schema, cache=_schema_cache) + qaplots_subtask_spec_doc = qaplots_subtask_template.get_default_json_document_for_schema() qaplots_subtask_spec_doc['autocorrelation'] = obs_task_qaplots_spec.get("autocorrelation") qaplots_subtask_spec_doc['crosscorrelation'] = obs_task_qaplots_spec.get("crosscorrelation") validate_json_against_schema(qaplots_subtask_spec_doc, qaplots_subtask_template.schema) @@ -635,7 +631,7 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") - selection_doc = get_default_json_object_for_schema(selection_template.schema, cache=_schema_cache) + selection_doc = selection_template.get_default_json_document_for_schema() qaplots_subtask_input = SubtaskInput.objects.create(subtask=qaplots_subtask, producer=qafile_subtask.outputs.first(), selection_doc=selection_doc, @@ -668,8 +664,8 @@ def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, s # step 1: create subtask in defining state, with filled-in subtask_template subtask_template = SubtaskTemplate.objects.get(name=subtask_template_name) - default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) - task_specs_with_defaults = add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc, task_blueprint.specifications_template.schema, cache=_schema_cache) + default_subtask_specs = subtask_template.get_default_json_document_for_schema() + task_specs_with_defaults = task_blueprint.specifications_template.add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc) subtask_specs = generate_subtask_specs_from_task_spec_func(task_specs_with_defaults, default_subtask_specs) cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") @@ -725,7 +721,7 @@ def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> # step 1: create subtask in defining state, with filled-in subtask_template subtask_template = SubtaskTemplate.objects.get(name='ingest control') - default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) + default_subtask_specs = subtask_template.get_default_json_document_for_schema() subtask_specs = default_subtask_specs # todo: translate specs from task to subtask once we have non-empty templates cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") subtask_data = {"start_time": None, @@ -768,7 +764,7 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> # step 1: create subtask in defining state, with filled-in subtask_template subtask_template = SubtaskTemplate.objects.get(name='cleanup') - subtask_specs = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) + subtask_specs = subtask_template.get_default_json_document_for_schema() cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") subtask_data = {"start_time": None, "stop_time": None, @@ -1179,15 +1175,17 @@ def schedule_qafile_subtask(qafile_subtask: Subtask): # step 4: create output dataproducts, and link these to the output # TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint? if qafile_subtask.outputs.first(): + dp_spec_template = DataproductSpecificationsTemplate.objects.get(name="empty") + dp_feedvack_template = DataproductFeedbackTemplate.objects.get(name="empty") qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%s_QA.h5" % (qa_input.producer.subtask_id, ), directory="/data/qa/qa_files", dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value), datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct? producer=qafile_subtask.outputs.first(), - specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema, cache=_schema_cache), - specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"), - feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema, cache=_schema_cache), - feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"), + specifications_doc=dp_spec_template.get_default_json_document_for_schema(), + specifications_template=dp_spec_template, + feedback_doc=dp_feedvack_template.get_default_json_document_for_schema(), + feedback_template=dp_feedvack_template, sap=None # todo: do we need to point to a SAP here? Of which dataproduct then? ) qafile_subtask_dataproduct.save() @@ -1233,14 +1231,17 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask): # TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint? qafile_subtask = qaplots_subtask.predecessors.first() obs_subtask = qafile_subtask.predecessors.first() + dp_spec_template = DataproductSpecificationsTemplate.objects.get(name="empty") + dp_feedvack_template = DataproductFeedbackTemplate.objects.get(name="empty") + qaplots_subtask_dataproduct = Dataproduct.objects.create(directory="/data/qa/plots/L%s" % (obs_subtask.id, ), dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_PLOTS.value), datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct? producer=qaplots_subtask.outputs.first(), - specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema, cache=_schema_cache), - specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"), - feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema, cache=_schema_cache), - feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"), + specifications_doc=dp_spec_template.get_default_json_document_for_schema(), + specifications_template=dp_spec_template, + feedback_doc=dp_feedvack_template.get_default_json_document_for_schema(), + feedback_template=dp_feedvack_template, sap=None # todo: do we need to point to a SAP here? Of which dataproduct then? ) qaplots_subtask_dataproduct.save() @@ -1344,7 +1345,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): specifications_doc = observation_subtask.specifications_doc dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP") dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") - dataproduct_feedback_doc = get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache) + dataproduct_feedback_doc = dataproduct_feedback_template.get_default_json_document_for_schema() # select correct output for each pointing based on name @@ -1445,7 +1446,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): "identifiers": {"sap_index": sap_nr, "pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr} }, specifications_template=dataproduct_specifications_template_timeseries, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache), + feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(), feedback_template=dataproduct_feedback_template, size=0, expected_size=1024*1024*1024*tab_nr, @@ -1515,7 +1516,7 @@ def _create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask: S producer=pipeline_subtask_output, specifications_doc=input_dp.specifications_doc, specifications_template=dataproduct_specifications_template, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache), + feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(), feedback_template=dataproduct_feedback_template, sap=input_dp.sap, global_identifier=None) for input_dp in input_dataproducts] @@ -1614,7 +1615,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: producer=pipeline_subtask_output, specifications_doc=input_dp.specifications_doc, specifications_template=dataproduct_specifications_template, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache), + feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(), feedback_template=dataproduct_feedback_template, sap=input_dp.sap, global_identifier=None) @@ -1660,7 +1661,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: producer=pipeline_subtask_output, specifications_doc={ "coherent": data_type != "is", "identifiers": { "data_type": data_type } }, specifications_template=dataproduct_specifications_template, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache), + feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(), feedback_template=dataproduct_feedback_template, sap=None, # TODO: Can we say anything here, as summaries cover all SAPs global_identifier=None) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py index fe9b593331369d330531b39153750391a0b303b4..eaa94ffa1aba63a94e07dc141247e066bc43eebe 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py @@ -7,7 +7,6 @@ from lofar.common.datetimeutils import round_to_minute_precision from functools import cmp_to_key import os from copy import deepcopy -from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema import logging from datetime import datetime, timedelta from django.db.utils import IntegrityError @@ -17,13 +16,10 @@ from lofar.common.util import dict_with_overrides, subdict_of_pointer_items logger = logging.getLogger(__name__) -# cache for json schema's -_schema_cache = {} - def create_scheduling_unit_draft_from_observing_strategy_template(strategy_template: models.SchedulingUnitObservingStrategyTemplate, scheduling_set: models.SchedulingSet, name: str, description: str=None, specifications_doc_overrides: dict=None) -> models.SchedulingUnitDraft: '''create a new SchedulingUnitDraft from the given strategy_template with 'parent' scheduling_set''' - specifications_doc = add_defaults_to_json_object_for_schema(strategy_template.template, strategy_template.scheduling_unit_template.schema) + specifications_doc = strategy_template.template_doc_complete_with_defaults # apply overrides on template specifications_doc if given if specifications_doc_overrides: @@ -110,7 +106,7 @@ def copy_scheduling_unit_draft(scheduling_unit_draft: models.SchedulingUnitDraft logger.info("copy_scheduling_unit_draft(scheduling_unit_draft.id=%s) created copy_scheduling_unit_draft id=%s", scheduling_unit_draft.pk, scheduling_unit_draft_copy.pk) - # instantiate a copy of the taks graph + # instantiate a copy of the task graph scheduling_unit_draft_copy = update_task_graph_from_specifications_doc(scheduling_unit_draft_copy, scheduling_unit_draft.specifications_doc) return scheduling_unit_draft_copy @@ -230,7 +226,7 @@ def update_task_graph_from_specifications_doc(scheduling_unit_draft: models.Sche task_template = models.TaskTemplate.objects.get(name=task_template_name) task_specifications_doc = task_definition["specifications_doc"] - task_specifications_doc = add_defaults_to_json_object_for_schema(task_specifications_doc, task_template.schema, cache=_schema_cache) + task_specifications_doc = task_template.add_defaults_to_json_object_for_schema(task_specifications_doc) logger.debug("creating/updating task draft... task_name='%s', task_template_name='%s'", task_template_name, task_template_name) @@ -546,7 +542,7 @@ def create_cleanuptask_for_scheduling_unit_blueprint(scheduling_unit_blueprint: with transaction.atomic(): # create a cleanup task draft and blueprint.... cleanup_template = models.TaskTemplate.objects.get(name="cleanup") - cleanup_spec_doc = get_default_json_object_for_schema(cleanup_template.schema, cache=_schema_cache) + cleanup_spec_doc = cleanup_template.get_default_json_object_for_schema() cleanup_task_draft = models.TaskDraft.objects.create( name="Cleanup", @@ -568,7 +564,7 @@ def create_cleanuptask_for_scheduling_unit_blueprint(scheduling_unit_blueprint: # ... and connect the outputs of the producing tasks to the cleanup, so the cleanup task knows what to remove. selection_template = TaskRelationSelectionTemplate.objects.get(name="all") - selection_doc = get_default_json_object_for_schema(selection_template.schema, cache=_schema_cache) + selection_doc = selection_template.get_default_json_object_for_schema() for producer_task_blueprint in scheduling_unit_blueprint.task_blueprints.exclude(specifications_template__type=TaskType.Choices.CLEANUP).exclude(specifications_template__type=TaskType.Choices.INGEST).all(): for connector_type in producer_task_blueprint.specifications_template.output_connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).all(): diff --git a/SAS/TMSS/backend/test/t_subtasks.py b/SAS/TMSS/backend/test/t_subtasks.py index 4eb98001d73f6e511d8436aba12354aa06494f4b..d8bb40f5f41a9064658642a60f652edb774e29cb 100755 --- a/SAS/TMSS/backend/test/t_subtasks.py +++ b/SAS/TMSS/backend/test/t_subtasks.py @@ -74,7 +74,7 @@ def create_task_blueprint_object_for_testing(task_template_name="target observat :return: task_blueprint_obj: Created Task Blueprint object """ task_template = models.TaskTemplate.objects.get(name=task_template_name) - task_spec = add_defaults_to_json_object_for_schema(task_spec or {}, task_template.schema) + task_spec = task_template.add_defaults_to_json_object_for_schema(task_spec or {}) if 'QA' in task_spec: task_spec["QA"]['plots']['enabled'] = QA_enabled task_spec["QA"]['file_conversion']['enabled'] = QA_enabled