diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index c694d3eebe2612d032a4d647aef8058935f73654..54a7cdc98a13bb8464c31e9b88069c09acddb97d 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -643,7 +643,7 @@ def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) - # Dataproducts. NOTE: The pulsar pipeline doesn't actually use this information, and reads input/writes output as it pleases. inputs = subtask.inputs.all() - in_dataproducts = sum([subtasK_input.dataproducts.all() for subtask_input in subtask.inputs.all()], []) + in_dataproducts = sum([list(subtask_input.dataproducts.all()) for subtask_input in inputs], []) coherent_in_dataproducts = [dp for dp in in_dataproducts if dp.specifications_doc["coherent"]] incoherent_in_dataproducts = [dp for dp in in_dataproducts if not dp.specifications_doc["coherent"]] diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 20600fcd76ca56394001e385046a59295e3af280..acdf6a50ae6b92b19c43d764e32bf95258b98338 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -1358,9 +1358,8 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: # ----- output tarball per input dataproduct # input:output mapping is 1:1 - input_dps = list(pipeline_subtask_input.dataproducts.all()) output_dp_objects = [] - for input_dp in input_dps: + for input_dp in input_dataproducts: is_coherent = input_dp.specifications_doc["coherent"] output_dp = Dataproduct(filename="%s.tar" % (splitext(input_dp.filename)[0],), # .h5 -> .tar @@ -1381,7 +1380,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: output_dps = _bulk_create_dataproducts_with_global_identifiers(output_dp_objects) pipeline_subtask_output.dataproducts.set(output_dps) - transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=False) for input_dp,output_dp in zip(input_dps, output_dps)] + transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=False) for input_dp,output_dp in zip(input_dataproducts, output_dps)] DataproductTransform.objects.bulk_create(transforms) # ----- summary tarballs @@ -1399,7 +1398,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: return dataproduct.filename.split("_")[0] # construct how input dataproducts map onto the summaries - summary_mapping = {dp: (dp_obsid(dp), dp.specifications_doc["coherent"]) for dp in input_dps} + summary_mapping = {dp: (dp_obsid(dp), dp.specifications_doc["coherent"]) for dp in input_dataproducts} summaries = set(summary_mapping.values()) summary_dps = {(obsid, is_coherent): Dataproduct(filename="L%s_summary%s.tar" % (obsid, "CS" if is_coherent else "IS"), @@ -1411,12 +1410,12 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: specifications_template=dataproduct_specifications_template, feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), feedback_template=dataproduct_feedback_template, - sap=input_dp.sap, + sap=None, # TODO: Can we say anything here, as summaries cover all SAPs global_identifier=None) for (obsid, is_coherent) in summaries} # create the dataproducts _bulk_create_dataproducts_with_global_identifiers(summary_dps.values()) - pipeline_subtask_output.dataproducts.add(summary_dps.values()) + pipeline_subtask_output.dataproducts.add(*summary_dps.values()) # populate the transform, each input_dp is input for its corresponding summary transforms = [DataproductTransform(input=input_dp, output=summary_dps[(obsid, is_coherent)], identity=False) for (input_dp, (obsid, is_coherent)) in summary_mapping.items()] diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 5bcfa16e9e29e9e82b75a3c5f13dff663a89289d..c135fb25540791a8ec5cec3a797ed69a1f04b72d 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -156,7 +156,8 @@ class SchedulingTest(unittest.TestCase): self.assertEqual([], duplicate_dataproduct_specification_docs) def test_schedule_observation_subtask_with_enough_resources_available(self): - spec = { "stations": { "digital_pointings": [ { "subbands": [0] } ] } } + spec = { "stations": { "digital_pointings": [ { "subbands": [0] } ] }, + "COBALT": { "correlator": { "enabled": True } } } self._test_schedule_observation_subtask_with_enough_resources_available(spec) def test_schedule_beamformer_observation_subtask_with_enough_resources_available(self): @@ -189,6 +190,7 @@ class SchedulingTest(unittest.TestCase): task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(task_blueprint_data, '/task_blueprint/') subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['subbands'] = [0] cluster_url = client.get_path_as_json_object('/cluster/1')['url'] @@ -224,6 +226,7 @@ class SchedulingTest(unittest.TestCase): subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['subbands'] = [0] spec['stations']['station_list'] = ['CS001', 'CS002', 'CS401'] @@ -262,9 +265,10 @@ class SchedulingTest(unittest.TestCase): task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(task_blueprint_data,'/task_blueprint/') subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['subbands'] = [0] - cluster_url = client.get_path_as_json_object('/cluster/1')['url'] spec['stations']['station_list'] = ['CS001', 'CS002', 'CS003'] + cluster_url = client.get_path_as_json_object('/cluster/1')['url'] subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], specifications_doc=spec, cluster_url=cluster_url, @@ -281,16 +285,13 @@ class SchedulingTest(unittest.TestCase): self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask_id)['status']) - def test_schedule_pipeline_subtask_with_enough_resources_available(self): - with tmss_test_env.create_tmss_client() as client: + def _setup_observation_and_pipeline(self, client, obs_spec, dataproduct_properties, pipeline_task_template_name, pipeline_subtask_template_name, pipeline_subtask_spec): cluster_url = client.get_path_as_json_object('/cluster/1')['url'] # setup: first create an observation, so the pipeline can have input. obs_task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="target observation")['url']) obs_task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(obs_task_blueprint_data, '/task_blueprint/') obs_subtask_template = client.get_subtask_template("observation control") - obs_spec = get_default_json_object_for_schema(obs_subtask_template['schema']) - obs_spec['stations']['digital_pointings'][0]['subbands'] = [0] obs_subtask_data = test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url'], specifications_doc=obs_spec, @@ -298,16 +299,14 @@ class SchedulingTest(unittest.TestCase): task_blueprint_url=obs_task_blueprint['url']) obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') - test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], - specifications_doc={"sap": "target0", "subband": 0 }, - subtask_output_url=obs_subtask_output_url), '/dataproduct/') + test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(**dataproduct_properties, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the pipeline... - pipe_task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="preprocessing pipeline")['url']) + pipe_task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name=pipeline_task_template_name)['url']) pipe_task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(pipe_task_blueprint_data, '/task_blueprint/') - pipe_subtask_template = client.get_subtask_template("pipeline control") - pipe_spec = get_default_json_object_for_schema(pipe_subtask_template['schema']) + pipe_subtask_template = client.get_subtask_template(pipeline_subtask_template_name) + pipe_spec = add_defaults_to_json_object_for_schema(pipeline_subtask_spec, pipe_subtask_template['schema']) pipe_subtask_data = test_data_creator.Subtask(specifications_template_url=pipe_subtask_template['url'], specifications_doc=pipe_spec, @@ -323,6 +322,56 @@ class SchedulingTest(unittest.TestCase): client.set_subtask_status(predecessor['id'], 'finished') client.set_subtask_status(pipe_subtask['id'], 'defined') + + return pipe_subtask + + def test_schedule_preprocessing_pipeline_subtask_with_enough_resources_available(self): + with tmss_test_env.create_tmss_client() as client: + obs_subtask_template = client.get_subtask_template("observation control") + obs_spec = get_default_json_object_for_schema(obs_subtask_template['schema']) + obs_spec['stations']['digital_pointings'][0]['subbands'] = [0] + obs_spec['COBALT']['correlator']['enabled'] = True + + pipe_subtask = self._setup_observation_and_pipeline(client, + obs_spec, + {"filename": "L123456_SB000.MS", + "specifications_doc": {"sap": "target0", "subband": 0 } }, + "preprocessing pipeline", + "pipeline control", + {}) + + subtask = client.schedule_subtask(pipe_subtask['id']) + + self.assertEqual('scheduled', subtask['state_value']) + self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=pipe_subtask['id'])['status']) + + def test_schedule_pulsar_pipeline_subtask_with_enough_resources_available(self): + with tmss_test_env.create_tmss_client() as client: + obs_subtask_template = client.get_subtask_template("observation control") + obs_spec = { + "stations": { "digital_pointings": [ { "name": "target0", "subbands": [0] } ] }, + "COBALT": { + "version": 1, + "correlator": { "enabled": False }, + "beamformer": { + "tab_pipelines": [ + { + "SAPs": [ { "name": "target0", "tabs": [ { "coherent": False }, { "coherent": True } ] } ] + } + ] + } + } + } + obs_spec = add_defaults_to_json_object_for_schema(obs_spec,obs_subtask_template['schema']) + + pipe_subtask = self._setup_observation_and_pipeline(client, + obs_spec, + {"filename": "L123456_SAP000_B000_S0_P000.h5", + "specifications_doc": { "sap": "target0", "coherent": True, "identifiers": { "sap_index": 0, "tab_index": 0, "pipeline_index": 0, "part_index": 0, "stokes_index": 0 } } }, + "pulsar pipeline", + "pulsar pipeline", + {}) + subtask = client.schedule_subtask(pipe_subtask['id']) self.assertEqual('scheduled', subtask['state_value'])