From fb3f8f71d39002c95f1eded5497ad8ef8a86ddcc Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 26 Apr 2021 17:40:16 +0200 Subject: [PATCH] TMSS-717: speed improvement, use caching --- LCS/PyCommon/json_utils.py | 4 +- .../backend/src/tmss/tmssapp/models/common.py | 2 +- SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py | 39 ++++++++++--------- SAS/TMSS/backend/src/tmss/tmssapp/tasks.py | 11 ++++-- 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/LCS/PyCommon/json_utils.py b/LCS/PyCommon/json_utils.py index 74dcfa1db09..6a40f670614 100644 --- a/LCS/PyCommon/json_utils.py +++ b/LCS/PyCommon/json_utils.py @@ -273,13 +273,13 @@ def validate_json_against_schema(json_string: str, schema: str): raise jsonschema.exceptions.ValidationError(str(e)) -def get_default_json_object_for_schema(schema: str) -> dict: +def get_default_json_object_for_schema(schema: str, cache: dict=None, max_cache_age: timedelta=DEFAULT_MAX_SCHEMA_CACHE_AGE) -> dict: """ TMSS wrapper for TMSS 'add_defaults_to_json_object_for_schema' :param schema: :return: json_object with default values of the schema """ - data = add_defaults_to_json_object_for_schema({}, schema) + data = add_defaults_to_json_object_for_schema({}, schema, cache=cache, max_cache_age=max_cache_age) if '$id' in schema: data['$schema'] = schema['$id'] return data diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/common.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/common.py index 4eeeb68e1a4..b13765d5a07 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/common.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/common.py @@ -203,7 +203,7 @@ class TemplateSchemaMixin(): # add defaults for missing properies, and validate on the fly # use the class's _schema_cache - document = add_defaults_to_json_object_for_schema(document, template.schema, self._schema_cache) + document = add_defaults_to_json_object_for_schema(document, template.schema, cache=self._schema_cache, max_cache_age=self._MAX_SCHEMA_CACHE_AGE) # update the model instance with the updated and validated document setattr(self, document_attr, document) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index e99dd864d74..116fdbe86a2 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -33,6 +33,9 @@ from lofar.sas.tmss.tmss.tmssapp.conversions import antennafields_for_antennaset from lofar.sas.tmss.tmss.exceptions import TMSSException from django.db import transaction +# cache for json schema's +_schema_cache = {} + # ==== various create* methods to convert/create a TaskBlueprint into one or more Subtasks ==== def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bool: @@ -155,7 +158,7 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta # start with an observation subtask specification with all the defaults and the right structure according to the schema subtask_template = SubtaskTemplate.objects.get(name='observation control') - subtask_spec = get_default_json_object_for_schema(subtask_template.schema) + subtask_spec = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) # wipe the default pointings, these should come from the task_spec subtask_spec['stations'].pop('analog_pointing', None) @@ -534,7 +537,7 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) # step 1: create subtask in defining state, with filled-in subtask_template qafile_subtask_template = SubtaskTemplate.objects.get(name="QA file conversion") - qafile_subtask_spec = add_defaults_to_json_object_for_schema({}, qafile_subtask_template.schema) + qafile_subtask_spec = add_defaults_to_json_object_for_schema({}, qafile_subtask_template.schema, cache=_schema_cache) qafile_subtask_spec['nr_of_subbands'] = obs_task_qafile_spec.get("nr_of_subbands") qafile_subtask_spec['nr_of_timestamps'] = obs_task_qafile_spec.get("nr_of_timestamps") validate_json_against_schema(qafile_subtask_spec, qafile_subtask_template.schema) @@ -551,7 +554,7 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") - selection_doc = get_default_json_object_for_schema(selection_template.schema) + selection_doc = get_default_json_object_for_schema(selection_template.schema, cache=_schema_cache) for obs_out in observation_subtask.outputs.all(): qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask, @@ -615,7 +618,7 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta # step 1: create subtask in defining state, with filled-in subtask_template qaplots_subtask_template = SubtaskTemplate.objects.get(name="QA plots") - qaplots_subtask_spec_doc = add_defaults_to_json_object_for_schema({}, qaplots_subtask_template.schema) + qaplots_subtask_spec_doc = add_defaults_to_json_object_for_schema({}, qaplots_subtask_template.schema, cache=_schema_cache) qaplots_subtask_spec_doc['autocorrelation'] = obs_task_qaplots_spec.get("autocorrelation") qaplots_subtask_spec_doc['crosscorrelation'] = obs_task_qaplots_spec.get("crosscorrelation") validate_json_against_schema(qaplots_subtask_spec_doc, qaplots_subtask_template.schema) @@ -632,7 +635,7 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") - selection_doc = get_default_json_object_for_schema(selection_template.schema) + selection_doc = get_default_json_object_for_schema(selection_template.schema, cache=_schema_cache) qaplots_subtask_input = SubtaskInput.objects.create(subtask=qaplots_subtask, producer=qafile_subtask.outputs.first(), selection_doc=selection_doc, @@ -667,8 +670,8 @@ def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, s # step 1: create subtask in defining state, with filled-in subtask_template subtask_template = SubtaskTemplate.objects.get(name=subtask_template_name) - default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema) - task_specs_with_defaults = add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc, task_blueprint.specifications_template.schema) + default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) + task_specs_with_defaults = add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc, task_blueprint.specifications_template.schema, cache=_schema_cache) subtask_specs = generate_subtask_specs_from_task_spec_func(task_specs_with_defaults, default_subtask_specs) cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") @@ -723,7 +726,7 @@ def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> # step 1: create subtask in defining state, with filled-in subtask_template subtask_template = SubtaskTemplate.objects.get(name='ingest control') - default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema) + default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) subtask_specs = default_subtask_specs # todo: translate specs from task to subtask once we have non-empty templates cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") subtask_data = {"start_time": None, @@ -766,7 +769,7 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> # step 1: create subtask in defining state, with filled-in subtask_template subtask_template = SubtaskTemplate.objects.get(name='cleanup') - subtask_specs = get_default_json_object_for_schema(subtask_template.schema) + subtask_specs = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") subtask_data = {"start_time": None, "stop_time": None, @@ -1170,9 +1173,9 @@ def schedule_qafile_subtask(qafile_subtask: Subtask): dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value), datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct? producer=qafile_subtask.outputs.first(), - specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema), + specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema, cache=_schema_cache), specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"), - feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema), + feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema, cache=_schema_cache), feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"), sap=None # todo: do we need to point to a SAP here? Of which dataproduct then? ) @@ -1223,9 +1226,9 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask): dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_PLOTS.value), datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct? producer=qaplots_subtask.outputs.first(), - specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema), + specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema, cache=_schema_cache), specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"), - feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema), + feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema, cache=_schema_cache), feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"), sap=None # todo: do we need to point to a SAP here? Of which dataproduct then? ) @@ -1336,7 +1339,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): specifications_doc = observation_subtask.specifications_doc dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP") dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") - dataproduct_feedback_doc = get_default_json_object_for_schema(dataproduct_feedback_template.schema) + dataproduct_feedback_doc = get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache) # select correct output for each pointing based on name @@ -1433,7 +1436,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): producer=observation_subtask.outputs.first(), # todo: select correct output. I tried "subtask_output_dict[sap['name']]" but tests fail because the sap's name is not in the task blueprint. Maybe it's just test setup and this should work? specifications_doc={"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"], "coherent": coherent, "identifiers": {"pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr}}, specifications_template=dataproduct_specifications_template_timeseries, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache), feedback_template=dataproduct_feedback_template, size=0, expected_size=1024*1024*1024*tab_nr, @@ -1503,7 +1506,7 @@ def _create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask: S producer=pipeline_subtask_output, specifications_doc=input_dp.specifications_doc, specifications_template=dataproduct_specifications_template, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache), feedback_template=dataproduct_feedback_template, sap=input_dp.sap, global_identifier=None) for input_dp in input_dataproducts] @@ -1537,7 +1540,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: producer=pipeline_subtask_output, specifications_doc=input_dp.specifications_doc, specifications_template=dataproduct_specifications_template, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache), feedback_template=dataproduct_feedback_template, sap=input_dp.sap, global_identifier=None) for input_dp in input_dataproducts] @@ -1575,7 +1578,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: producer=pipeline_subtask_output, specifications_doc={ "coherent": is_coherent, "identifiers": { "obsid": obsid } }, specifications_template=dataproduct_specifications_template, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema, cache=_schema_cache), feedback_template=dataproduct_feedback_template, sap=None, # TODO: Can we say anything here, as summaries cover all SAPs global_identifier=None) for (obsid, is_coherent) in summaries} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py index a43d4d81c28..b729f07ecdd 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py @@ -15,6 +15,9 @@ from django.db import transaction logger = logging.getLogger(__name__) +# cache for json schema's +_schema_cache = {} + def create_scheduling_unit_blueprint_from_scheduling_unit_draft(scheduling_unit_draft: models.SchedulingUnitDraft) -> models.SchedulingUnitBlueprint: """ Create a SchedulingUnitBlueprint from the SchedulingUnitDraft @@ -146,12 +149,14 @@ def create_task_drafts_from_scheduling_unit_draft(scheduling_unit_draft: models. if len(scheduling_unit_draft.requirements_doc.get("tasks", {})) == 0: raise BlueprintCreationException("create_task_drafts_from_scheduling_unit_draft: scheduling_unit_draft.id=%s has no tasks defined in its requirements_doc" % (scheduling_unit_draft.pk,)) + schema_cache = {} + for task_name, task_definition in scheduling_unit_draft.requirements_doc["tasks"].items(): task_template_name = task_definition["specifications_template"] task_template = models.TaskTemplate.objects.get(name=task_template_name) task_specifications_doc = task_definition["specifications_doc"] - task_specifications_doc = add_defaults_to_json_object_for_schema(task_specifications_doc, task_template.schema) + task_specifications_doc = add_defaults_to_json_object_for_schema(task_specifications_doc, task_template.schema, cache=_schema_cache) try: logger.debug("creating task draft... task_name='%s', task_template_name='%s'", task_template_name, task_template_name) @@ -464,7 +469,7 @@ def create_cleanuptask_for_scheduling_unit_blueprint(scheduling_unit_blueprint: with transaction.atomic(): # create a cleanup task draft and blueprint.... cleanup_template = models.TaskTemplate.objects.get(name="cleanup") - cleanup_spec_doc = get_default_json_object_for_schema(cleanup_template.schema) + cleanup_spec_doc = get_default_json_object_for_schema(cleanup_template.schema, cache=_schema_cache) cleanup_task_draft = models.TaskDraft.objects.create( name="Cleanup", @@ -487,7 +492,7 @@ def create_cleanuptask_for_scheduling_unit_blueprint(scheduling_unit_blueprint: # ... and connect the outputs of the producing tasks to the cleanup, so the cleanup task knows what to remove. selection_template = TaskRelationSelectionTemplate.objects.get(name="all") - selection_doc = get_default_json_object_for_schema(selection_template.schema) + selection_doc = get_default_json_object_for_schema(selection_template.schema, cache=_schema_cache) for producer_task_blueprint in scheduling_unit_blueprint.task_blueprints.exclude(specifications_template__type=TaskType.Choices.CLEANUP).exclude(specifications_template__type=TaskType.Choices.INGEST).all(): for connector_type in producer_task_blueprint.specifications_template.output_connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).all(): -- GitLab