From 843e8068f46fa25d67d1d595c978bdfa99a13a35 Mon Sep 17 00:00:00 2001 From: jkuensem <jkuensem@physik.uni-bielefeld.de> Date: Fri, 16 Oct 2020 10:29:34 +0200 Subject: [PATCH] TMSS-315: Enable SAP spec validation and more testing --- .../src/tmss/tmssapp/models/scheduling.py | 4 + .../tmss/tmssapp/schemas/sap_template-1.json | 9 +- SAS/TMSS/src/tmss/tmssapp/subtasks.py | 26 +++++- SAS/TMSS/test/t_scheduling.py | 85 ++++++++++++++++++- 4 files changed, 117 insertions(+), 7 deletions(-) diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index 0d9532f2889..804e65936fb 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -263,6 +263,10 @@ class SAP(BasicCommon): specifications_doc = JSONField(help_text='SAP properties.') specifications_template = ForeignKey('SAPTemplate', null=False, on_delete=CASCADE, help_text='Schema used for specifications_doc.') + def save(self, force_insert=False, force_update=False, using=None, update_fields=None): + annotate_validate_add_defaults_to_doc_using_template(self, 'specifications_doc', 'specifications_template') + + super().save(force_insert, force_update, using, update_fields) class Dataproduct(BasicCommon): """ diff --git a/SAS/TMSS/src/tmss/tmssapp/schemas/sap_template-1.json b/SAS/TMSS/src/tmss/tmssapp/schemas/sap_template-1.json index 320f8ffeaea..cae98bf2322 100644 --- a/SAS/TMSS/src/tmss/tmssapp/schemas/sap_template-1.json +++ b/SAS/TMSS/src/tmss/tmssapp/schemas/sap_template-1.json @@ -1,5 +1,7 @@ { + "$id":"http://tmss.lofar.org/api/schemas/saptemplate/sap/1#", "$schema": "http://json-schema.org/draft-06/schema#", + "title": "SAP", "type": "object", "properties": { "identifiers": { @@ -34,13 +36,17 @@ }, "angle2": { "type": "number" + }, + "angle3": { + "type": "number" } }, "additionalProperties": false, "required": [ "direction_type", "angle1", - "angle2" + "angle2", + "angle3" ] }, "time": { @@ -117,7 +123,6 @@ ] } }, - "additionalProperties": false, "required": [ "identifiers", "name", diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py index 87189cc5cb1..cc4225c8241 100644 --- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -551,7 +551,8 @@ def schedule_qafile_subtask(qafile_subtask: Subtask): specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema), specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"), feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema), - feedback_template=DataproductFeedbackTemplate.objects.get(name="empty") + feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"), + SAP=None # todo: do we need to point to a SAP here? ) # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) @@ -602,7 +603,8 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask): specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema), specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"), feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema), - feedback_template=DataproductFeedbackTemplate.objects.get(name="empty") + feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"), + SAP=None # todo: do we need to point to a SAP here? ) # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) @@ -713,6 +715,20 @@ def schedule_observation_subtask(observation_subtask: Subtask): observation_subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name, observation_subtask.id) for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']): + sap_name = "%s_%s" % (observation_subtask.id, pointing['name']) + sap = SAP.objects.create(name=sap_name, + specifications_doc={ "name": sap_name, + "identifiers": {}, # todo: TMSS-324 + "pointing": pointing['pointing'], + "time": {"start_time": observation_subtask.start_time.isoformat(), + "duration": (observation_subtask.stop_time - observation_subtask.start_time).total_seconds()}, + "antennas": { + "set": specifications_doc['stations']['antenna_set'], + "stations": specifications_doc['stations']['station_list'], + "fields": [] # todo: do we really have to calculate an object like {"station": "CS001", "field": "HBA", "type": "HBA"} here again for all involved stations? Isn't that info derived later anyway? + } + }, + specifications_template=SAPTemplate.objects.get(name="SAP")) for sb_nr in pointing['subbands']: Dataproduct.objects.create(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr), directory=directory, @@ -724,7 +740,8 @@ def schedule_observation_subtask(observation_subtask: Subtask): feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), feedback_template=dataproduct_feedback_template, size=0 if sb_nr%10==0 else 1024*1024*1024*sb_nr, - expected_size=1024*1024*1024*sb_nr) + expected_size=1024*1024*1024*sb_nr, + SAP=sap) # step 4: resource assigner (if possible) _assign_resources(observation_subtask) @@ -803,7 +820,8 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): specifications_doc={}, specifications_template=dataproduct_specifications_template, feedback_doc="", - feedback_template=dataproduct_feedback_template) + feedback_template=dataproduct_feedback_template, + SAP=input_dp.SAP) DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False) output_dps.append(output_dp) diff --git a/SAS/TMSS/test/t_scheduling.py b/SAS/TMSS/test/t_scheduling.py index fa64b627ef4..85a77ee0b46 100755 --- a/SAS/TMSS/test/t_scheduling.py +++ b/SAS/TMSS/test/t_scheduling.py @@ -278,7 +278,7 @@ class SubtaskInputOutputTest(unittest.TestCase): dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': [0]})) dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': [1]})) - # uncomment when RA scheduler works + # todo: uncomment when RA scheduler works # # trigger: # # schedule pipeline, which should attach the correct subset of dataproducts to the pipeline inputs # schedule_pipeline_subtask(pipe_st) @@ -287,6 +287,89 @@ class SubtaskInputOutputTest(unittest.TestCase): # # check correct input filtering # self.assertEqual(set(pipe_in1.dataproducts.all()), {dp1_1, dp1_3}) # self.assertEqual(set(pipe_in2.dataproducts.all()), {dp2_2}) + ''' + 90: ====================================================================== + 90: ERROR: test_schedule_pipeline_subtask_filters_predecessor_output_dataproducts_for_input (__main__.SubtaskInputOutputTest) + 90: ---------------------------------------------------------------------- + 90: Traceback (most recent call last): + 90: File "t_scheduling.py", line 284, in test_schedule_pipeline_subtask_filters_predecessor_output_dataproducts_for_input + 90: schedule_pipeline_subtask(pipe_st) + 90: File "/lofar/build/gnucxx11_opt/lib64/python3.6/site-packages/lofar/sas/tmss/tmss/tmssapp/subtasks.py", line 831, in schedule_pipeline_subtask + 90: _assign_resources(pipeline_subtask) + 90: File "/lofar/build/gnucxx11_opt/lib64/python3.6/site-packages/lofar/sas/tmss/tmss/tmssapp/subtasks.py", line 512, in _assign_resources + 90: raise SubtaskSchedulingException("Cannot schedule subtask id=%d because the required resources are not (fully) available." % (subtask.pk, )) + 90: lofar.sas.tmss.tmss.exceptions.SubtaskSchedulingException: Cannot schedule subtask id=2000012 because the required resources are not (fully) available. + 90: + 90: ---------------------------------------------------------------------- + ''' + +class SAPTest(unittest.TestCase): + """ + SAP test + These testcases are located in the t_scheduling module, because the SAP entries are created/assigned during scheduling + """ + + def setUp(self) -> None: + # make sure we're allowed to schedule + setting = Setting.objects.get(name='allow_scheduling_observations') + setting.value = True + setting.save() + + def test_schedule_observation_subtask_creates_sap_with_correct_pointing(self): + with tmss_test_env.create_tmss_client() as client: + subtask_template = client.get_subtask_template("observation control") + spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['stations']['digital_pointings'][0]['subbands'] = [0] + cluster_url = client.get_path_as_json_object('/cluster/1')['url'] + pointing = {"angle1": 7.6, "angle2": 5.4, "direction_type": "J2000"} + spec['stations']['digital_pointings'][0]['pointing'] = pointing + + subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], + specifications_doc=spec, + cluster_url = cluster_url, + start_time=datetime.utcnow() + timedelta(minutes=5), + stop_time=datetime.utcnow() + timedelta(minutes=15)) + subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') + subtask_id = subtask['id'] + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), + '/subtask_output/') + + sap_count_before_scheduling = models.SAP.objects.count() + client.set_subtask_status(subtask_id, 'defined') + subtask = client.schedule_subtask(subtask_id) + + self.assertGreater(models.SAP.objects.count(), sap_count_before_scheduling) + self.assertEqual(models.SAP.objects.first().specifications_doc['pointing']['angle1'], pointing['angle1']) + self.assertEqual(models.SAP.objects.first().specifications_doc['pointing']['angle2'], pointing['angle2']) + + + def test_schedule_pipeline_subtask_copies_sap_from_input_to_output(self): + # setup: + # create observation subtask and outputs and dataproducts + obs_st = create_subtask_object_for_testing('observation', 'finished') + obs_out = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=obs_st)) + + # create connected pipeline subtask and inputs, specify input filtering + pipe_st = create_subtask_object_for_testing('pipeline', 'defined') + pipe_out = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=pipe_st)) # required by scheduling function + pipe_in = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out)) + + # create obs output dataproducts + dp1_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out)) + dp2_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out)) + + # todo: uncomment when RA scheduler works + # # schedule pipeline, which should copy the SAP + # schedule_pipeline_subtask(pipe_st) + # + # # determine the newly created pipeline dataproducts + # dp1_out = DataproductTransform.objects.get(input=dp1_in).output + # dp2_out = DataproductTransform.objects.get(input=dp2_in).output + # + # # assert: + # self.assertEqual(dp1_in.SAP, dp1_out.SAP) + # self.assertEqual(dp2_in.SAP, dp2_out.SAP) + if __name__ == "__main__": -- GitLab