diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py b/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py index 45c2c1e56c6e2be20604a3f1ca3b20e56f8d9a24..9372a56c9fedb98b813fe386393ce4d079b5b5b2 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py @@ -84,7 +84,8 @@ def can_run_within_timewindow_with_daily_constraints(scheduling_unit: models.Sch :return: True if there is at least one possibility to place the scheduling unit in a way that all daily constraints are met over the runtime of the observation, else False. """ main_observation_task_name = get_longest_observation_task_name_from_requirements_doc(scheduling_unit) - duration = timedelta(seconds=scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']['duration']) + main_obs_task_spec = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc'] + duration = timedelta(seconds=main_obs_task_spec.get('duration', main_obs_task_spec.get('target',{}).get('duration',0))) window_lower_bound = lower_bound while window_lower_bound + duration < upper_bound: window_upper_bound = window_lower_bound + duration @@ -101,6 +102,7 @@ def can_run_anywhere_within_timewindow_with_daily_constraints(scheduling_unit: m :return: True if all daily constraints are met over the entire time window, else False. """ main_observation_task_name = get_longest_observation_task_name_from_requirements_doc(scheduling_unit) + main_obs_task_spec = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc'] constraints = scheduling_unit.scheduling_constraints_doc if not "daily" in constraints: @@ -115,7 +117,7 @@ def can_run_anywhere_within_timewindow_with_daily_constraints(scheduling_unit: m if upper_bound < lower_bound: raise ValueError("Provided upper_bound=%s is earlier than provided lower_bound=%s" % (upper_bound, lower_bound)) - station_groups = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']["station_groups"] + station_groups = main_obs_task_spec["station_groups"] stations = list(set(sum([group['stations'] for group in station_groups], []))) # flatten all station_groups to single list # check contraint and return false on first failure @@ -162,6 +164,7 @@ def can_run_within_timewindow_with_time_constraints(scheduling_unit: models.Sche constraints are met over the runtime of the observation, else False. """ main_observation_task_name = get_longest_observation_task_name_from_requirements_doc(scheduling_unit) + main_obs_task_spec = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc'] constraints = scheduling_unit.scheduling_constraints_doc # Check the 'at' constraint and then only check can_run_anywhere for the single possible time window @@ -171,8 +174,7 @@ def can_run_within_timewindow_with_time_constraints(scheduling_unit: models.Sche return can_run_anywhere_within_timewindow_with_time_constraints(scheduling_unit, lower_bound=at, upper_bound=at + scheduling_unit.duration) else: - duration = timedelta( - seconds=scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']['duration']) + duration = timedelta(seconds=main_obs_task_spec.get('duration', main_obs_task_spec.get('target', {}).get('duration', 0))) window_lower_bound = lower_bound while window_lower_bound + duration <= upper_bound: window_upper_bound = window_lower_bound + duration @@ -284,7 +286,7 @@ def can_run_anywhere_within_timewindow_with_sky_constraints(scheduling_unit: mod if task['specifications_template'] == 'calibrator observation': min_elevation = Angle(constraints['sky']['min_calibrator_elevation'], unit=astropy.units.rad) main_observation_task_name = get_target_observation_task_name_from_requirements_doc(scheduling_unit) - station_groups = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']["station_groups"] + station_groups = main_obs_task_spec["station_groups"] else: min_elevation = Angle(constraints['sky']['min_target_elevation'], unit=astropy.units.rad) station_groups = task['specifications_doc']['station_groups'] @@ -392,11 +394,10 @@ def get_longest_observation_task_name_from_requirements_doc(scheduling_unit: mod longest_observation_duration = 0 for task_name, task in scheduling_unit.requirements_doc['tasks'].items(): if 'observation' in task.get('specifications_template', ''): - if 'duration' in task.get('specifications_doc', {}): - duration = task['specifications_doc']['duration'] - if duration > longest_observation_duration: - longest_observation_duration = duration - longest_observation_task_name = task_name + duration = task['specifications_doc'].get('duration', task['specifications_doc'].get('target',{}).get('duration', 0)) + if duration > longest_observation_duration: + longest_observation_duration = duration + longest_observation_task_name = task_name if longest_observation_task_name is not None: return longest_observation_task_name raise TMSSException("Cannot find a longest observation in scheduling_unit requirements_doc") @@ -407,7 +408,8 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep # TODO: for estimating the earliest_possible_start_time, we need the full duration of the scheduling unit, not just the longest one. main_observation_task_name = get_longest_observation_task_name_from_requirements_doc(scheduling_unit) - duration = timedelta(seconds=scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']['duration']) + main_obs_task_spec = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc'] + duration = timedelta(seconds=main_obs_task_spec.get('duration', main_obs_task_spec.get('target',{}).get('duration',0))) try: if 'time' in constraints and 'at' in constraints['time']: at = parser.parse(constraints['time']['at'], ignoretz=True) @@ -418,7 +420,7 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep return max(lower_bound, after) if 'daily' in constraints and (constraints['daily']['require_day'] or constraints['daily']['require_night'] or constraints['daily']['avoid_twilight']): - station_groups = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']["station_groups"] + station_groups = main_obs_task_spec["station_groups"] stations = list(set(sum([group['stations'] for group in station_groups], []))) # flatten all station_groups to single list all_sun_events = timestamps_and_stations_to_sun_rise_and_set(timestamps=(lower_bound,lower_bound+timedelta(days=1)), stations=tuple(stations)) start_time_per_station = {} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index 316129dea4cfbe713455e59026d02d70235b492f..60d07e5e55fc169b550c7f24a54a646fd6da78d4 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -189,7 +189,7 @@ class Subtask(BasicCommon, ProjectPropertyMixin, TemplateSchemaMixin): if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: # observations have a specified duration, so grab it from the spec. # In case we have several associated tasks: use the longest duration, since we assume that tasks will run in parallel (there would be no reason to combine them into a subtask). - return timedelta(seconds=max([tb.specifications_doc.get('duration', 0) for tb in self.task_blueprints.all()])) + return timedelta(seconds=max([tb.specifications_doc.get('duration', tb.specifications_doc.get('target', {}).get('duration', 0)) for tb in self.task_blueprints.all()])) if self.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: # pipelines usually do not have a specified duration, so make a guess (half the obs duration?). diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py index 221e5455b06bf149da57dba3d81f710bd0f8cf7e..4968011b312207b95229539a59daf9e4e933a95d 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py @@ -492,19 +492,13 @@ def populate_connectors(): # wild cards, like output_of=NULL meaning "any". logger.info("Populating TaskConnectorType's") - # calibrator observation - TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.CORRELATOR.value), - datatype=Datatype.objects.get(value=Datatype.Choices.VISIBILITIES.value), - dataformat=Dataformat.objects.get(value=Dataformat.Choices.MEASUREMENTSET.value), - task_template=TaskTemplate.objects.get(name='calibrator observation'), - iotype=IOType.objects.get(value=IOType.Choices.OUTPUT.value)) - - # target observation - TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.CORRELATOR.value), - datatype=Datatype.objects.get(value=Datatype.Choices.VISIBILITIES.value), - dataformat=Dataformat.objects.get(value=Dataformat.Choices.MEASUREMENTSET.value), - task_template=TaskTemplate.objects.get(name='target observation'), - iotype=IOType.objects.get(value=IOType.Choices.OUTPUT.value)) + # calibrator, target and combined imaging observations + for task_template_name in ['calibrator observation', 'target observation', 'parallel calibrator target observation']: + TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.CORRELATOR.value), + datatype=Datatype.objects.get(value=Datatype.Choices.VISIBILITIES.value), + dataformat=Dataformat.objects.get(value=Dataformat.Choices.MEASUREMENTSET.value), + task_template=TaskTemplate.objects.get(name=task_template_name), + iotype=IOType.objects.get(value=IOType.Choices.OUTPUT.value)) # beamforming observation TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.BEAMFORMER.value), diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/LBA-survey-observation-scheduling-unit-observation-strategy.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/LBA-survey-observation-scheduling-unit-observation-strategy.json index 809d98e79ce1a8a8dc97d9bbd03da6a97d5a97bc..0c938dc55bbd9b520b837a405ba94652f55001dc 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/LBA-survey-observation-scheduling-unit-observation-strategy.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/LBA-survey-observation-scheduling-unit-observation-strategy.json @@ -1,803 +1,803 @@ { - "tasks": { - "Ingest": { - "tags": [], - "description": "Ingest all preprocessed dataproducts", - "specifications_doc": {}, - "specifications_template": "ingest" + "tasks":{ + "Combined Observation":{ + "description":"Combined parallel Calibrator & Target Observation for UC1 LBA scheduling unit", + "specifications_doc":{ + "calibrator":{ + "name":"calibrator", + "duration":120, + "pointing":{ + "angle1":0.6624317181687094, + "angle2":1.5579526427549426, + "direction_type":"J2000" + }, + "autoselect":false }, - "Pipeline target1": { - "tags": [], - "description": "Preprocessing Pipeline for Target Observation target1", - "specifications_doc": { - "flag": { - "rfi_strategy": "LBAdefault", - "outerchannels": true, - "autocorrelations": true - }, - "demix": { - "sources": {}, - "time_steps": 10, - "ignore_target": false, - "frequency_steps": 64 - }, - "average": { - "time_steps": 1, - "frequency_steps": 4 - }, - "storagemanager": "dysco" + "target":{ + "QA":{ + "plots":{ + "enabled":true, + "autocorrelation":true, + "crosscorrelation":true }, - "specifications_template": "preprocessing pipeline" - }, - "Pipeline target2": { - "tags": [], - "description": "Preprocessing Pipeline for Target Observation target2", - "specifications_doc": { - "flag": { - "rfi_strategy": "LBAdefault", - "outerchannels": true, - "autocorrelations": true - }, - "demix": { - "sources": {}, - "time_steps": 10, - "ignore_target": false, - "frequency_steps": 64 - }, - "average": { - "time_steps": 1, - "frequency_steps": 4 - }, - "storagemanager": "dysco" - }, - "specifications_template": "preprocessing pipeline" - }, - "Pipeline target3": { - "tags": [], - "description": "Preprocessing Pipeline for Target Observation target3", - "specifications_doc": { - "flag": { - "rfi_strategy": "LBAdefault", - "outerchannels": true, - "autocorrelations": true - }, - "demix": { - "sources": {}, - "time_steps": 10, - "ignore_target": false, - "frequency_steps": 64 - }, - "average": { - "time_steps": 1, - "frequency_steps": 4 - }, - "storagemanager": "dysco" + "file_conversion":{ + "enabled":true, + "nr_of_subbands":-1, + "nr_of_timestamps":256 + } + }, + "SAPs":[ + { + "name":"target1", + "subbands":[ + 217, + 218, + 219, + 220, + 221, + 222, + 223, + 224, + 225, + 226, + 227, + 228, + 229, + 230, + 231, + 232, + 233, + 234, + 235, + 236, + 237, + 238, + 239, + 240, + 241, + 242, + 243, + 244, + 245, + 246, + 247, + 248, + 249, + 250, + 251, + 252, + 253, + 254, + 255, + 256, + 257, + 258, + 259, + 260, + 261, + 262, + 263, + 264, + 265, + 266, + 267, + 268, + 269, + 270, + 271, + 272, + 273, + 274, + 275, + 276, + 277, + 278, + 279, + 280, + 281, + 282, + 283, + 284, + 285, + 286, + 287, + 288, + 289, + 290, + 291, + 292, + 293, + 294, + 295, + 296, + 297, + 298, + 299, + 300, + 301, + 302, + 303, + 304, + 305, + 306, + 307, + 308, + 309, + 310, + 311, + 312, + 313, + 314, + 315, + 316, + 317, + 318, + 319, + 320, + 321, + 322, + 323, + 324, + 325, + 326, + 327, + 328, + 329, + 330, + 331, + 332, + 333, + 334, + 335, + 336, + 337, + 338 + ], + "digital_pointing":{ + "angle1":0.6624317181687094, + "angle2":1.5579526427549426, + "direction_type":"J2000" + } }, - "specifications_template": "preprocessing pipeline" - }, - "Target Observation": { - "tags": [], - "description": "Target Observation for UC1 LBA scheduling unit", - "specifications_doc": { - "QA": { - "plots": { - "enabled": true, - "autocorrelation": true, - "crosscorrelation": true - }, - "file_conversion": { - "enabled": true, - "nr_of_subbands": -1, - "nr_of_timestamps": 256 - } - }, - "SAPs": [ - { - "name": "target1", - "subbands": [ - 217, - 218, - 219, - 220, - 221, - 222, - 223, - 224, - 225, - 226, - 227, - 228, - 229, - 230, - 231, - 232, - 233, - 234, - 235, - 236, - 237, - 238, - 239, - 240, - 241, - 242, - 243, - 244, - 245, - 246, - 247, - 248, - 249, - 250, - 251, - 252, - 253, - 254, - 255, - 256, - 257, - 258, - 259, - 260, - 261, - 262, - 263, - 264, - 265, - 266, - 267, - 268, - 269, - 270, - 271, - 272, - 273, - 274, - 275, - 276, - 277, - 278, - 279, - 280, - 281, - 282, - 283, - 284, - 285, - 286, - 287, - 288, - 289, - 290, - 291, - 292, - 293, - 294, - 295, - 296, - 297, - 298, - 299, - 300, - 301, - 302, - 303, - 304, - 305, - 306, - 307, - 308, - 309, - 310, - 311, - 312, - 313, - 314, - 315, - 316, - 317, - 318, - 319, - 320, - 321, - 322, - 323, - 324, - 325, - 326, - 327, - 328, - 329, - 330, - 331, - 332, - 333, - 334, - 335, - 336, - 337, - 338 - ], - "digital_pointing": { - "angle1": 0.6624317181687094, - "angle2": 1.5579526427549426, - "direction_type": "J2000" - } - }, - { - "name": "target2", - "subbands": [ - 217, - 218, - 219, - 220, - 221, - 222, - 223, - 224, - 225, - 226, - 227, - 228, - 229, - 230, - 231, - 232, - 233, - 234, - 235, - 236, - 237, - 238, - 239, - 240, - 241, - 242, - 243, - 244, - 245, - 246, - 247, - 248, - 249, - 250, - 251, - 252, - 253, - 254, - 255, - 256, - 257, - 258, - 259, - 260, - 261, - 262, - 263, - 264, - 265, - 266, - 267, - 268, - 269, - 270, - 271, - 272, - 273, - 274, - 275, - 276, - 277, - 278, - 279, - 280, - 281, - 282, - 283, - 284, - 285, - 286, - 287, - 288, - 289, - 290, - 291, - 292, - 293, - 294, - 295, - 296, - 297, - 298, - 299, - 300, - 301, - 302, - 303, - 304, - 305, - 306, - 307, - 308, - 309, - 310, - 311, - 312, - 313, - 314, - 315, - 316, - 317, - 318, - 319, - 320, - 321, - 322, - 323, - 324, - 325, - 326, - 327, - 328, - 329, - 330, - 331, - 332, - 333, - 334, - 335, - 336, - 337, - 338 - ], - "digital_pointing": { - "angle1": 0.6624317181687094, - "angle2": 1.5579526427549426, - "direction_type": "J2000" - } - }, - { - "name": "target3", - "subbands": [ - 217, - 218, - 219, - 220, - 221, - 222, - 223, - 224, - 225, - 226, - 227, - 228, - 229, - 230, - 231, - 232, - 233, - 234, - 235, - 236, - 237, - 238, - 239, - 240, - 241, - 242, - 243, - 244, - 245, - 246, - 247, - 248, - 249, - 250, - 251, - 252, - 253, - 254, - 255, - 256, - 257, - 258, - 259, - 260, - 261, - 262, - 263, - 264, - 265, - 266, - 267, - 268, - 269, - 270, - 271, - 272, - 273, - 274, - 275, - 276, - 277, - 278, - 279, - 280, - 281, - 282, - 283, - 284, - 285, - 286, - 287, - 288, - 289, - 290, - 291, - 292, - 293, - 294, - 295, - 296, - 297, - 298, - 299, - 300, - 301, - 302, - 303, - 304, - 305, - 306, - 307, - 308, - 309, - 310, - 311, - 312, - 313, - 314, - 315, - 316, - 317, - 318, - 319, - 320, - 321, - 322, - 323, - 324, - 325, - 326, - 327, - 328, - 329, - 330, - 331, - 332, - 333, - 334, - 335, - 336, - 337, - 338 - ], - "digital_pointing": { - "angle1": 0.6624317181687094, - "angle2": 1.5579526427549426, - "direction_type": "J2000" - } - } - ], - "filter": "LBA_30_90", - "duration": 120, - "correlator": { - "storage_cluster": "CEP4", - "integration_time": 1, - "channels_per_subband": 64 - }, - "antenna_set": "LBA_SPARSE_EVEN", - "station_groups": [ - { - "stations": [ - "CS001", - "CS002", - "CS003", - "CS004", - "CS005", - "CS006", - "CS007", - "CS011", - "CS013", - "CS017", - "CS021", - "CS024", - "CS026", - "CS028", - "CS030", - "CS031", - "CS032", - "CS101", - "CS103", - "CS201", - "CS301", - "CS302", - "CS401", - "CS501", - "RS106", - "RS205", - "RS208", - "RS210", - "RS305", - "RS306", - "RS307", - "RS310", - "RS406", - "RS407", - "RS409", - "RS503", - "RS508", - "RS509" - ], - "max_nr_missing": 4 - }, - { - "stations": [ - "RS508", - "RS509" - ], - "max_nr_missing": 1 - }, - { - "stations": [ - "RS310", - "RS210" - ], - "max_nr_missing": 0 - } - ] + { + "name":"target2", + "subbands":[ + 217, + 218, + 219, + 220, + 221, + 222, + 223, + 224, + 225, + 226, + 227, + 228, + 229, + 230, + 231, + 232, + 233, + 234, + 235, + 236, + 237, + 238, + 239, + 240, + 241, + 242, + 243, + 244, + 245, + 246, + 247, + 248, + 249, + 250, + 251, + 252, + 253, + 254, + 255, + 256, + 257, + 258, + 259, + 260, + 261, + 262, + 263, + 264, + 265, + 266, + 267, + 268, + 269, + 270, + 271, + 272, + 273, + 274, + 275, + 276, + 277, + 278, + 279, + 280, + 281, + 282, + 283, + 284, + 285, + 286, + 287, + 288, + 289, + 290, + 291, + 292, + 293, + 294, + 295, + 296, + 297, + 298, + 299, + 300, + 301, + 302, + 303, + 304, + 305, + 306, + 307, + 308, + 309, + 310, + 311, + 312, + 313, + 314, + 315, + 316, + 317, + 318, + 319, + 320, + 321, + 322, + 323, + 324, + 325, + 326, + 327, + 328, + 329, + 330, + 331, + 332, + 333, + 334, + 335, + 336, + 337, + 338 + ], + "digital_pointing":{ + "angle1":0.6624317181687094, + "angle2":1.5579526427549426, + "direction_type":"J2000" + } }, - "specifications_template": "target observation" - }, - "Calibrator Pipeline": { - "tags": [], - "description": "Preprocessing Pipeline for Calibrator Observation", - "specifications_doc": { - "flag": { - "rfi_strategy": "LBAdefault", - "outerchannels": true, - "autocorrelations": true - }, - "demix": { - "sources": {}, - "time_steps": 10, - "ignore_target": false, - "frequency_steps": 64 - }, - "average": { - "time_steps": 1, - "frequency_steps": 4 - }, - "storagemanager": "dysco" + { + "name":"target3", + "subbands":[ + 217, + 218, + 219, + 220, + 221, + 222, + 223, + 224, + 225, + 226, + 227, + 228, + 229, + 230, + 231, + 232, + 233, + 234, + 235, + 236, + 237, + 238, + 239, + 240, + 241, + 242, + 243, + 244, + 245, + 246, + 247, + 248, + 249, + 250, + 251, + 252, + 253, + 254, + 255, + 256, + 257, + 258, + 259, + 260, + 261, + 262, + 263, + 264, + 265, + 266, + 267, + 268, + 269, + 270, + 271, + 272, + 273, + 274, + 275, + 276, + 277, + 278, + 279, + 280, + 281, + 282, + 283, + 284, + 285, + 286, + 287, + 288, + 289, + 290, + 291, + 292, + 293, + 294, + 295, + 296, + 297, + 298, + 299, + 300, + 301, + 302, + 303, + 304, + 305, + 306, + 307, + 308, + 309, + 310, + 311, + 312, + 313, + 314, + 315, + 316, + 317, + 318, + 319, + 320, + 321, + 322, + 323, + 324, + 325, + 326, + 327, + 328, + 329, + 330, + 331, + 332, + 333, + 334, + 335, + 336, + 337, + 338 + ], + "digital_pointing":{ + "angle1":0.6624317181687094, + "angle2":1.5579526427549426, + "direction_type":"J2000" + } + } + ], + "filter":"LBA_30_90", + "duration":120, + "correlator":{ + "storage_cluster":"CEP4", + "integration_time":1, + "channels_per_subband":64 + }, + "antenna_set":"LBA_SPARSE_EVEN", + "station_groups":[ + { + "stations":[ + "CS001", + "CS002", + "CS003", + "CS004", + "CS005", + "CS006", + "CS007", + "CS011", + "CS013", + "CS017", + "CS021", + "CS024", + "CS026", + "CS028", + "CS030", + "CS031", + "CS032", + "CS101", + "CS103", + "CS201", + "CS301", + "CS302", + "CS401", + "CS501", + "RS106", + "RS205", + "RS208", + "RS210", + "RS305", + "RS306", + "RS307", + "RS310", + "RS406", + "RS407", + "RS409", + "RS503", + "RS508", + "RS509" + ], + "max_nr_missing":4 }, - "specifications_template": "preprocessing pipeline" - }, - "Calibrator Observation": { - "tags": [], - "description": "Calibrator Observation for UC1 LBA scheduling unit", - "specifications_doc": { - "name": "calibrator", - "duration": 120, - "pointing": { - "angle1": 0.6624317181687094, - "angle2": 1.5579526427549426, - "direction_type": "J2000" - }, - "autoselect": false + { + "stations":[ + "RS508", + "RS509" + ], + "max_nr_missing":1 }, - "specifications_template": "calibrator observation" + { + "stations":[ + "RS310", + "RS210" + ], + "max_nr_missing":0 + } + ] } + }, + "specifications_template":"parallel calibrator target observation" }, - "parameters": [ - { - "name": "Target Pointing 1", - "refs": [ - "#/tasks/Target Observation/specifications_doc/SAPs/0/digital_pointing" - ] + "Ingest":{ + "description":"Ingest all preprocessed dataproducts", + "specifications_doc":{ + + }, + "specifications_template":"ingest" + }, + "Pipeline target1":{ + "description":"Preprocessing Pipeline for Target Observation target1", + "specifications_doc":{ + "flag":{ + "rfi_strategy":"LBAdefault", + "outerchannels":true, + "autocorrelations":true }, - { - "name": "Target Pointing 2", - "refs": [ - "#/tasks/Target Observation/specifications_doc/SAPs/1/digital_pointing" - ] + "demix":{ + "sources":{ + + }, + "time_steps":10, + "ignore_target":false, + "frequency_steps":64 }, - { - "name": "Target Pointing 3", - "refs": [ - "#/tasks/Target Observation/specifications_doc/SAPs/2/digital_pointing" - ] + "average":{ + "time_steps":1, + "frequency_steps":4 }, - { - "name": "Time averaging steps", - "refs": [ - "#/tasks/Pipeline target1/specifications_doc/average/time_steps", - "#/tasks/Pipeline target2/specifications_doc/average/time_steps", - "#/tasks/Pipeline target3/specifications_doc/average/time_steps" - ] + "storagemanager":"dysco" + }, + "specifications_template":"preprocessing pipeline" + }, + "Pipeline target2":{ + "description":"Preprocessing Pipeline for Target Observation target2", + "specifications_doc":{ + "flag":{ + "rfi_strategy":"LBAdefault", + "outerchannels":true, + "autocorrelations":true }, - { - "name": "Frequency averaging steps", - "refs": [ - "#/tasks/Pipeline target1/specifications_doc/average/frequency_steps", - "#/tasks/Pipeline target2/specifications_doc/average/frequency_steps", - "#/tasks/Pipeline target3/specifications_doc/average/frequency_steps" - ] + "demix":{ + "sources":{ + + }, + "time_steps":10, + "ignore_target":false, + "frequency_steps":64 }, - { - "name": "Demix sources", - "refs": [ - "#/tasks/Pipeline target1/specifications_doc/demix/sources", - "#/tasks/Pipeline target2/specifications_doc/demix/sources", - "#/tasks/Pipeline target3/specifications_doc/demix/sources" - ] - } - ], - "task_relations": [ - { - "tags": [], - "input": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "output": { - "role": "correlator", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "consumer": "Calibrator Pipeline", - "producer": "Calibrator Observation", - "selection_doc": {}, - "selection_template": "all" + "average":{ + "time_steps":1, + "frequency_steps":4 }, - { - "tags": [], - "input": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "output": { - "role": "correlator", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "consumer": "Pipeline target1", - "producer": "Target Observation", - "selection_doc": { - "sap": [ - "target1" - ] - }, - "selection_template": "SAP" + "storagemanager":"dysco" + }, + "specifications_template":"preprocessing pipeline" + }, + "Pipeline target3":{ + "description":"Preprocessing Pipeline for Target Observation target3", + "specifications_doc":{ + "flag":{ + "rfi_strategy":"LBAdefault", + "outerchannels":true, + "autocorrelations":true }, - { - "tags": [], - "input": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "output": { - "role": "correlator", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "consumer": "Pipeline target2", - "producer": "Target Observation", - "selection_doc": { - "sap": [ - "target2" - ] - }, - "selection_template": "SAP" + "demix":{ + "sources":{ + + }, + "time_steps":10, + "ignore_target":false, + "frequency_steps":64 }, - { - "tags": [], - "input": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "output": { - "role": "correlator", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "consumer": "Pipeline target3", - "producer": "Target Observation", - "selection_doc": { - "sap": [ - "target3" - ] - }, - "selection_template": "SAP" + "average":{ + "time_steps":1, + "frequency_steps":4 }, - { - "tags": [], - "input": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "output": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "consumer": "Ingest", - "producer": "Calibrator Pipeline", - "selection_doc": {}, - "selection_template": "all" + "storagemanager":"dysco" + }, + "specifications_template":"preprocessing pipeline" + }, + "Calibrator Pipeline":{ + "description":"Preprocessing Pipeline for Calibrator Observation", + "specifications_doc":{ + "flag":{ + "rfi_strategy":"LBAdefault", + "outerchannels":true, + "autocorrelations":true }, - { - "tags": [], - "input": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "output": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "consumer": "Ingest", - "producer": "Pipeline target1", - "selection_doc": {}, - "selection_template": "all" + "demix":{ + "sources":{ + + }, + "time_steps":10, + "ignore_target":false, + "frequency_steps":64 }, - { - "tags": [], - "input": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "output": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "consumer": "Ingest", - "producer": "Pipeline target2", - "selection_doc": {}, - "selection_template": "all" + "average":{ + "time_steps":1, + "frequency_steps":4 }, - { - "tags": [], - "input": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "output": { - "role": "any", - "datatype": "visibilities", - "dataformat": "MeasurementSet" - }, - "consumer": "Ingest", - "producer": "Pipeline target3", - "selection_doc": {}, - "selection_template": "all" - } - ], - "task_scheduling_relations": [ - { - "first": "Calibrator Observation", - "second": "Target Observation", - "placement": "parallel", - "time_offset": 0 - } - ] + "storagemanager":"dysco" + }, + "specifications_template":"preprocessing pipeline" + } + }, + "parameters":[ + { + "name":"Target Pointing 1", + "refs":[ + "#/tasks/Combined Observation/specifications_doc/SAPs/0/digital_pointing" + ] + }, + { + "name":"Target Pointing 2", + "refs":[ + "#/tasks/Combined Observation/specifications_doc/SAPs/1/digital_pointing" + ] + }, + { + "name":"Target Pointing 3", + "refs":[ + "#/tasks/Combined Observation/specifications_doc/SAPs/2/digital_pointing" + ] + }, + { + "name":"Time averaging steps", + "refs":[ + "#/tasks/Pipeline target1/specifications_doc/average/time_steps", + "#/tasks/Pipeline target2/specifications_doc/average/time_steps", + "#/tasks/Pipeline target3/specifications_doc/average/time_steps" + ] + }, + { + "name":"Frequency averaging steps", + "refs":[ + "#/tasks/Pipeline target1/specifications_doc/average/frequency_steps", + "#/tasks/Pipeline target2/specifications_doc/average/frequency_steps", + "#/tasks/Pipeline target3/specifications_doc/average/frequency_steps" + ] + }, + { + "name":"Demix sources", + "refs":[ + "#/tasks/Pipeline target1/specifications_doc/demix/sources", + "#/tasks/Pipeline target2/specifications_doc/demix/sources", + "#/tasks/Pipeline target3/specifications_doc/demix/sources" + ] + } + ], + "task_relations":[ + { + "input":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "output":{ + "role":"correlator", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "consumer":"Calibrator Pipeline", + "producer":"Combined Observation", + "selection_doc":{ + "sap":[ + "calibrator" + ] + }, + "selection_template":"SAP" + }, + { + "input":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "output":{ + "role":"correlator", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "consumer":"Pipeline target1", + "producer":"Combined Observation", + "selection_doc":{ + "sap":[ + "target1" + ] + }, + "selection_template":"SAP" + }, + { + "input":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "output":{ + "role":"correlator", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "consumer":"Pipeline target2", + "producer":"Combined Observation", + "selection_doc":{ + "sap":[ + "target2" + ] + }, + "selection_template":"SAP" + }, + { + "input":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "output":{ + "role":"correlator", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "consumer":"Pipeline target3", + "producer":"Combined Observation", + "selection_doc":{ + "sap":[ + "target3" + ] + }, + "selection_template":"SAP" + }, + { + "input":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "output":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "consumer":"Ingest", + "producer":"Calibrator Pipeline", + "selection_doc":{ + + }, + "selection_template":"all" + }, + { + "input":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "output":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "consumer":"Ingest", + "producer":"Pipeline target1", + "selection_doc":{ + + }, + "selection_template":"all" + }, + { + "input":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "output":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "consumer":"Ingest", + "producer":"Pipeline target2", + "selection_doc":{ + + }, + "selection_template":"all" + }, + { + "input":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "output":{ + "role":"any", + "datatype":"visibilities", + "dataformat":"MeasurementSet" + }, + "consumer":"Ingest", + "producer":"Pipeline target3", + "selection_doc":{ + + }, + "selection_template":"all" + } + ], + "task_scheduling_relations":[ + + ] } \ No newline at end of file diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-parallel_calibrator_target_observation-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-parallel_calibrator_target_observation-1.json new file mode 100644 index 0000000000000000000000000000000000000000..390a9de17cd81f33d40c9e31ffbae1a9f2de8c62 --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-parallel_calibrator_target_observation-1.json @@ -0,0 +1,142 @@ +{ + "$id": "http://tmss.lofar.org/api/schemas/tasktemplate/parallel calibrator target observation/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "parallel calibrator target observation", + "description": "This schema combines the calibrator and target observation task schema's for parallel observing.", + "version": 1, + "type": "object", + "properties": { + "calibrator": { + "description": "This subschema defines the (extra) parameters to setup a calibrator observation task, which uses all paramters from the target observation task which it is linked to, plus these calibrator overrides.", + "type": "object", + "default": {}, + "properties": { + "autoselect": { + "type": "boolean", + "title": "Auto-select", + "description": "Auto-select calibrator based on elevation", + "default": true + }, + "pointing": { + "title": "Digital pointing", + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/pointing/1/#/definitions/pointing", + "description": "Manually selected calibrator", + "default": {} + }, + "name": { + "title": "Name", + "description": "Name of the calibrator SAP", + "type": "string", + "default": "calibrator" + } + }, + "required": [ + "autoselect", "pointing", "name" + ] + }, + "target": { + "description": "This subschema defines the parameters to setup the target observation part.", + "type": "object", + "default": {}, + "properties": { + "station_groups": { + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/stations/1#/definitions/station_groups", + "default": [ { + "stations": ["CS002", "CS003", "CS004", "CS005", "CS006", "CS007"], + "max_nr_missing": 1 + } ] + }, + "antenna_set": { + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/stations/1/#/definitions/antenna_set", + "default": "HBA_DUAL" + }, + "filter": { + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/stations/1/#/definitions/filter", + "default": "HBA_110_190" + }, + "tile_beam": { + "title": "Tile beam", + "description": "HBA only", + "default": {}, + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/pointing/1/#/definitions/pointing" + }, + "SAPs": { + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/stations/1/#/definitions/SAPs", + "minItems": 1, + "default": [{}] + }, + "duration": { + "$id": "#duration", + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/datetime/1/#/definitions/timedelta", + "title": "Duration", + "description": "Duration of this observation (seconds)", + "default": 300, + "minimum": 1 + }, + "correlator": { + "title": "Correlator Settings", + "type": "object", + "additionalProperties": false, + "default": {}, + "properties": { + "channels_per_subband": { + "type": "integer", + "title": "Channels/subband", + "description": "Number of frequency bands per subband", + "default": 64, + "minimum": 8, + "enum": [ + 8, + 16, + 32, + 64, + 128, + 256, + 512, + 1024 + ] + }, + "integration_time": { + "title": "Integration time", + "description": "Desired integration period (seconds)", + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/datetime/1/#/definitions/timedelta", + "default": 1, + "minimum": 0.1 + }, + "storage_cluster": { + "type": "string", + "title": "Storage cluster", + "description": "Cluster to write output to", + "default": "CEP4", + "enum": [ + "CEP4", + "DragNet" + ] + } + }, + "required": [ + "channels_per_subband", + "integration_time", + "storage_cluster" + ] + }, + "QA": { + "default":{}, + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/QA/1#/definitions/QA" + } + }, + "required": [ + "station_groups", + "antenna_set", + "filter", + "SAPs", + "duration", + "correlator" + ] + } + }, + "required": [ + "calibrator", + "target" + ] +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json index 0b9d027a6e1721604051537375a2ed13e6f9faa9..a151cc5fc1875ad2fb3901abdb78692f54a1e327 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json @@ -87,6 +87,12 @@ "type": "observation", "validation_code_js": "" }, + { + "file_name": "task_template-parallel_calibrator_target_observation-1.json", + "template": "task_template", + "type": "observation", + "validation_code_js": "" + }, { "file_name": "task_template-beamforming_observation-1.json", "template": "task_template", @@ -323,3 +329,5 @@ "version": 1 } ] + + diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 05eab32c6709a745defcdf6f9a34c37fdf39bc15..890d029e6cebbd73213ea467325837e91355e7ef 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -74,6 +74,7 @@ def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subta 'ingest': [create_ingest_subtask_from_task_blueprint], 'cleanup': [create_cleanup_subtask_from_task_blueprint]} generators_mapping['calibrator observation'] = generators_mapping['target observation'] + generators_mapping['parallel calibrator target observation'] = generators_mapping['target observation'] generators_mapping['beamforming observation'] = [create_observation_control_subtask_from_task_blueprint] with transaction.atomic(): @@ -149,13 +150,34 @@ def _get_related_target_sap_by_name(task_blueprint, sap_name): return target_sap raise SubtaskCreationException("Cannot create beamformer subtask from task id=%s because it does not contain target SAP with name=%s" % (task_blueprint.id, sap_name)) +def _get_target_and_or_calibrator_specification(task_blueprint: TaskBlueprint) -> (dict, dict): + # extract the individual target/calibrator specs if possible + if task_blueprint.specifications_template.name == 'parallel calibrator target observation': + return (task_blueprint.specifications_doc['target'], task_blueprint.specifications_doc['calibrator']) + + if task_blueprint.specifications_template.name in ('target observation', 'beamforming observation'): + cal_task, _ = _get_related_observation_task_blueprint(task_blueprint, 'calibrator observation') + if cal_task is None: + return (task_blueprint.specifications_doc, None) + else: + return (task_blueprint.specifications_doc, cal_task.specifications_doc) + + if task_blueprint.specifications_template.name == 'calibrator observation': + target_task, _ = _get_related_observation_task_blueprint(task_blueprint, 'target observation') + if target_task is None: + return (None, task_blueprint.specifications_doc) + else: + return (target_task.specifications_doc, task_blueprint.specifications_doc) + + return None, None + def create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint: TaskBlueprint) -> (dict, SubtaskTemplate): """ Create a valid observation subtask specification ('observation control' SubtaskTemplate schema) based on the task_blueprint's settings """ # check if task_blueprint has an observation-like specification - if task_blueprint.specifications_template.name.lower() not in ['target observation', 'calibrator observation', 'beamforming observation']: + if task_blueprint.specifications_template.name.lower() not in ['target observation', 'calibrator observation', 'beamforming observation', 'parallel calibrator target observation']: raise SubtaskCreationException("Cannot create observation subtask specifications from task_blueprint id=%s with template name='%s'" % ( task_blueprint.id, task_blueprint.specifications_template.name)) @@ -169,81 +191,24 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta # now go over the settings in the task_spec and 'copy'/'convert' them to the subtask_spec task_spec = task_blueprint.specifications_doc + # extract the individual target/calibrator specs if possible + target_task_spec, calibrator_task_spec = _get_target_and_or_calibrator_specification(task_blueprint) # block size calculator will need to be fed all the relevant specs cobalt_calculator_constraints = BlockConstraints(None, [], []) - # The calibrator has a minimal calibration-specific specification subset. - # The rest of it's specs are 'shared' with the target observation. - # So... copy the calibrator specs first, then loop over the shared target/calibrator specs... - if 'calibrator' in task_blueprint.specifications_template.name.lower(): - # Calibrator requires related Target Task Observation for some specifications - target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint) - if target_task_blueprint is None: - raise SubtaskCreationException("Cannot create calibrator observation subtask specifications from task_blueprint id=%s with template name='%s' because no related target observation task_blueprint is found" % (task_blueprint.id, task_blueprint.specifications_template.name)) - target_task_spec = target_task_blueprint.specifications_doc - - if task_spec.get('autoselect', True): - logger.info("auto-selecting calibrator target based on elevation of target observation...") - # Get related Target Observation Task - if "tile_beam" in target_task_spec: - subtask_spec['stations']['analog_pointing'] = { - "direction_type": target_task_spec["tile_beam"]["direction_type"], - "angle1": target_task_spec["tile_beam"]["angle1"], - "angle2": target_task_spec["tile_beam"]["angle2"]} - else: - raise SubtaskCreationException("Cannot determine the pointing specification from task_blueprint " - "id=%s in auto-select mode, because the related target observation " - "task_blueprint id=%s has no tile beam pointing defined" % ( - task_blueprint.id, target_task_blueprint.id)) - else: - subtask_spec['stations']['analog_pointing'] = {"direction_type": task_spec["pointing"]["direction_type"], - "angle1": task_spec["pointing"]["angle1"], - "angle2": task_spec["pointing"]["angle2"]} - - # for the calibrator, the subbands are the union of the subbands of the targetobs - subbands = [] - for SAP in target_task_spec['SAPs']: - subbands.extend(SAP['subbands']) - subbands = sorted(list(set(subbands))) - - # for the calibrator, the digital pointing is equal to the analog pointing - subtask_spec['stations']['digital_pointings'] = [ {'name': task_spec['name'], - 'subbands': subbands, - 'pointing': subtask_spec['stations']['analog_pointing'] } ] - # Use the Task Specification of the Target Observation - task_spec = target_task_spec - logger.info("Using station and correlator settings for calibrator observation task_blueprint id=%s from target observation task_blueprint id=%s", - task_blueprint.id, target_task_blueprint.id) - # correlator subtask_spec["COBALT"]["correlator"] = { "enabled": False } - if "correlator" in task_spec: + if target_task_spec and "correlator" in target_task_spec: subtask_spec["COBALT"]["correlator"]["enabled"] = True - subtask_spec["COBALT"]["correlator"]["channels_per_subband"] = task_spec["correlator"]["channels_per_subband"] + subtask_spec["COBALT"]["correlator"]["channels_per_subband"] = target_task_spec["correlator"]["channels_per_subband"] corr = CorrelatorSettings() - corr.nrChannelsPerSubband = task_spec["correlator"]["channels_per_subband"] - corr.integrationTime = task_spec["correlator"]["integration_time"] + corr.nrChannelsPerSubband = target_task_spec["correlator"]["channels_per_subband"] + corr.integrationTime = target_task_spec["correlator"]["integration_time"] cobalt_calculator_constraints.correlator = corr - # At this moment of subtask creation we known which stations we *want* from the task_spec - # But we do not know yet which stations are available at the moment of observing. - # So, we decided that we set the subtask station_list as the union of all stations in all specified groups. - # This way, the user can see which stations are (likely) to be used. - # At the moment of scheduling of this subtask, then station_list is re-evaluated, and the max_nr_missing per group is validated. - subtask_spec['stations']['station_list'] = [] - if "station_groups" in task_spec: - for station_group in task_spec["station_groups"]: - subtask_spec['stations']['station_list'].extend(station_group["stations"]) - # make list have unique items - subtask_spec['stations']['station_list'] = sorted(list(set(subtask_spec['stations']['station_list']))) - - if not subtask_spec['stations']['station_list']: - raise SubtaskCreationException("Cannot create observation subtask specifications for task_blueprint id=%s. No stations are defined." % (task_blueprint.id,)) - - # The beamformer obs has a beamformer-specific specification block. # The rest of it's specs is the same as in a target observation. # So... copy the beamformer specs first, then loop over the shared specs... @@ -335,13 +300,10 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta # if sap['subbands'] == target_sap['subbands']: # todo: is this really required? pseudo-code in confluence suggests so, but what harm does the list do? # sap['subbands'] = [] - subtask_spec['stations']["antenna_set"] = task_spec["antenna_set"] - subtask_spec['stations']["filter"] = task_spec["filter"] - - if 'calibrator' not in task_blueprint.specifications_template.name.lower() and \ - 'beamformer' not in task_blueprint.specifications_template.name.lower(): + if target_task_spec and ('target' in task_blueprint.specifications_template.name.lower() or 'beamforming' in task_blueprint.specifications_template.name.lower()): # copy/convert the analoge/digital_pointings only for non-calibrator observations (the calibrator has its own pointing) - for sap in task_spec.get("SAPs", []): + # for parallel calibrator&target observations the calibrator pointing is added later as a final sap. + for sap in target_task_spec.get("SAPs", []): subtask_spec['stations']['digital_pointings'].append( {"name": sap["name"], "target": sap["target"], @@ -351,18 +313,75 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta "subbands": sap["subbands"] }) - if "tile_beam" in task_spec: - subtask_spec['stations']['analog_pointing'] = { "direction_type": task_spec["tile_beam"]["direction_type"], - "angle1": task_spec["tile_beam"]["angle1"], - "angle2": task_spec["tile_beam"]["angle2"] } + if "tile_beam" in target_task_spec: + subtask_spec['stations']['analog_pointing'] = { "direction_type": target_task_spec["tile_beam"]["direction_type"], + "angle1": target_task_spec["tile_beam"]["angle1"], + "angle2": target_task_spec["tile_beam"]["angle2"] } + # The calibrator has a minimal calibration-specific specification subset. + # The rest of it's specs are 'shared' with the target observation. + # So... copy the calibrator specs first, then loop over the shared target/calibrator specs... + if calibrator_task_spec and 'calibrator' in task_blueprint.specifications_template.name.lower(): + # Calibrator requires related Target Task Observation for some specifications + target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint) + if target_task_blueprint is None: + raise SubtaskCreationException("Cannot create calibrator observation subtask specifications from task_blueprint id=%s with template name='%s' because no related target observation task_blueprint is found" % (task_blueprint.id, task_blueprint.specifications_template.name)) + + if calibrator_task_spec.get('autoselect', True): + logger.info("auto-selecting calibrator target based on elevation of target observation...") + # Get related Target Observation Task + if "tile_beam" in target_task_spec: + subtask_spec['stations']['analog_pointing'] = { + "direction_type": target_task_spec["tile_beam"]["direction_type"], + "angle1": target_task_spec["tile_beam"]["angle1"], + "angle2": target_task_spec["tile_beam"]["angle2"]} + else: + raise SubtaskCreationException("Cannot determine the pointing specification from task_blueprint " + "id=%s in auto-select mode, because the related target observation " + "task_blueprint id=%s has no tile beam pointing defined" % ( + task_blueprint.id, target_task_blueprint.id)) + else: + subtask_spec['stations']['analog_pointing'] = {"direction_type": calibrator_task_spec["pointing"]["direction_type"], + "angle1": calibrator_task_spec["pointing"]["angle1"], + "angle2": calibrator_task_spec["pointing"]["angle2"]} + + # for the calibrator, the subbands are the union of the subbands of the targetobs + subbands = [] + for SAP in target_task_spec['SAPs']: + subbands.extend(SAP['subbands']) + subbands = sorted(list(set(subbands))) + + # for a plain calibrator, the digital pointing is equal to the analog pointing + subtask_spec['stations']['digital_pointings'].append({'name': calibrator_task_spec['name'], + 'subbands': subbands, + 'pointing': subtask_spec['stations']['analog_pointing']}) + + # At this moment of subtask creation we known which stations we *want* from the task_spec + # But we do not know yet which stations are available at the moment of observing. + # So, we decided that we set the subtask station_list as the union of all stations in all specified groups. + # This way, the user can see which stations are (likely) to be used. + # At the moment of scheduling of this subtask, then station_list is re-evaluated, and the max_nr_missing per group is validated. + subtask_spec['stations']['station_list'] = [] + if target_task_spec and "station_groups" in target_task_spec: + for station_group in target_task_spec["station_groups"]: + subtask_spec['stations']['station_list'].extend(station_group["stations"]) + # make list have unique items + subtask_spec['stations']['station_list'] = sorted(list(set(subtask_spec['stations']['station_list']))) + + if not subtask_spec['stations']['station_list']: + raise SubtaskCreationException("Cannot create observation subtask specifications for task_blueprint id=%s. No stations are defined." % (task_blueprint.id,)) + + + subtask_spec['stations']["antenna_set"] = target_task_spec["antenna_set"] + subtask_spec['stations']["filter"] = target_task_spec["filter"] + # Calculate block sizes and feed those to the spec cobalt_calculator = BlockSize(constraints=cobalt_calculator_constraints) subtask_spec["COBALT"]["blocksize"] = cobalt_calculator.blockSize - if "correlator" in task_spec: + if "correlator" in target_task_spec: subtask_spec["COBALT"]["correlator"]["blocks_per_integration"] = cobalt_calculator.nrBlocks subtask_spec["COBALT"]["correlator"]["integrations_per_block"] = cobalt_calculator.nrSubblocks @@ -412,7 +431,7 @@ def get_related_calibrator_observation_task_blueprint(target_task_blueprint: Tas raise ValueError("Cannot get a related calibrator observation task_blueprint for non-target task_blueprint id=%s template_name='%s'", target_task_blueprint.id, target_task_blueprint.specifications_template.name) - return _get_related_observation_task_blueprint(target_task_blueprint, 'calibrator observation') + return _get_related_observation_task_blueprint(target_task_blueprint, ['calibrator observation', 'parallel calibrator target observation']) def get_related_target_observation_task_blueprint(calibrator_or_beamformer_task_blueprint: TaskBlueprint) -> (TaskBlueprint, SchedulingRelationPlacement): @@ -425,19 +444,25 @@ def get_related_target_observation_task_blueprint(calibrator_or_beamformer_task_ raise ValueError("Cannot get a related target observation task_blueprint for non-calibrator/beamformer task_blueprint id=%s template_name='%s'", calibrator_or_beamformer_task_blueprint.id, calibrator_or_beamformer_task_blueprint.specifications_template.name) - return _get_related_observation_task_blueprint(calibrator_or_beamformer_task_blueprint, 'target observation') + return _get_related_observation_task_blueprint(calibrator_or_beamformer_task_blueprint, ['target observation', 'parallel calibrator target observation']) + +def _get_related_observation_task_blueprint(task_blueprint: TaskBlueprint, related_template_name: typing.Union[str, list, tuple]) -> (TaskBlueprint, SchedulingRelationPlacement): + related_template_names = [related_template_name] if isinstance(related_template_name, str) else related_template_name -def _get_related_observation_task_blueprint(task_blueprint: TaskBlueprint, related_template_name: str) -> (TaskBlueprint, SchedulingRelationPlacement): try: return next((relation.second, relation.placement) for relation in TaskSchedulingRelationBlueprint.objects.filter(first=task_blueprint).all() - if relation.second is not None and relation.second.specifications_template.name.lower() == related_template_name) + if relation.second is not None and relation.second.specifications_template.name.lower() in related_template_names) except StopIteration: try: return next((relation.first, relation.placement) for relation in TaskSchedulingRelationBlueprint.objects.filter(second=task_blueprint).all() - if relation.first is not None and relation.first.specifications_template.name.lower() == related_template_name) + if relation.first is not None and relation.first.specifications_template.name.lower() in related_template_names) except StopIteration: - logger.info("No related %s task_blueprint found for task_blueprint id=%d", related_template_name, task_blueprint.id) + if task_blueprint.specifications_template.name.lower() in related_template_names: + # the 'related' task blueprint we are looking for is itself. + return task_blueprint, None + else: + logger.debug("No related %s task_blueprint found for task_blueprint id=%d", related_template_names, task_blueprint.id) return None, None @@ -471,24 +496,7 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB "cluster": Cluster.objects.get(name=cluster_name) } - # If we deal with a calibrator obs that runs in parallel to a target observation, add the calibrator beam to the - # existing target obs subtask. - subtask = None - if 'calibrator' in task_blueprint.specifications_template.name.lower(): - related_task_blueprint, relation = get_related_target_observation_task_blueprint(task_blueprint) - if relation and relation.value == 'parallel': - # add calibrator beam - subtask = related_task_blueprint.subtasks.filter(specifications_template__type__value=SubtaskType.Choices.OBSERVATION.value).first() - if not subtask: - raise SubtaskCreationException('Calibrator observation cannot be added to the target subtask, because it does not exist. Make sure to create a subtask from the target observation task id=%s first.' % related_task_blueprint.id) - subtask.specifications_doc['stations']['digital_pointings'] += subtask_data['specifications_doc']['stations']['digital_pointings'] - # check that the additional beam fits into the spec (observation must not result in >488 subbands across all beams) - total_subbands = sum([len(digital_pointing['subbands']) for digital_pointing in subtask.specifications_doc['stations']['digital_pointings']]) - if total_subbands > 488: # todo: should this be better handled in JSON? - raise SubtaskCreationException('Calibrator beam does not fit into the spec (results in %s total subbands, but only 488 are possible)' % total_subbands) - - if not subtask: - subtask = Subtask.objects.create(**subtask_data) + subtask = Subtask.objects.create(**subtask_data) subtask.task_blueprints.set(list(subtask.task_blueprints.all()) + [task_blueprint]) # step 2: create and link subtask input/output @@ -496,14 +504,8 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB subtask_output = SubtaskOutput.objects.create(subtask=subtask, task_blueprint=task_blueprint) - # step 3: set state to DEFINED, unless we have a target obs with a related parallel calibrator obs - defined = True - if 'target' in task_blueprint.specifications_template.name.lower(): - _, relation = get_related_calibrator_observation_task_blueprint(task_blueprint) - if relation and relation.value == 'parallel': - defined = False - if defined: - subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + # step 3: set state to DEFINED + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) subtask.save() return subtask @@ -1380,11 +1382,14 @@ def schedule_observation_subtask(observation_subtask: Subtask): task_blueprint.id, [(tb.id, tb.specifications_template.type) for tb in observation_subtask.task_blueprints.all()], [(out.task_blueprint.id, out.task_blueprint.specifications_template.type) for out in observation_subtask.outputs.all()])) - if 'SAPs' in task_blueprint.specifications_doc: # target - for sap in task_blueprint.specifications_doc['SAPs']: + + target_task_spec, calibrator_task_spec = _get_target_and_or_calibrator_specification(task_blueprint) + + if target_task_spec and 'SAPs' in target_task_spec: # target + for sap in target_task_spec['SAPs']: subtask_output_dict[sap['name']] = output - if 'pointing' in task_blueprint.specifications_doc: # calibrator - subtask_output_dict[task_blueprint.specifications_doc['name']] = output + if calibrator_task_spec and 'pointing' in calibrator_task_spec: # calibrator + subtask_output_dict[calibrator_task_spec['name']] = output # create SAP objects, as observations create new beams antennaset = specifications_doc['stations']['antenna_set'] diff --git a/SAS/TMSS/backend/test/t_observation_strategies_specification_and_scheduling_test.py b/SAS/TMSS/backend/test/t_observation_strategies_specification_and_scheduling_test.py index eb7f215048e75116dd5476bcb9332b572259c0db..284541b0b272deb9c39a6e1e1c0a579cab2bc59c 100755 --- a/SAS/TMSS/backend/test/t_observation_strategies_specification_and_scheduling_test.py +++ b/SAS/TMSS/backend/test/t_observation_strategies_specification_and_scheduling_test.py @@ -397,7 +397,7 @@ class TestObservationStrategiesSpecificationAndScheduling(unittest.TestCase): self.check_statuses(obs_subtask['id'], "finished", "finished", "finished") def test_lba_survey(self): - def check_parset(obs_subtask, is_target_obs: bool): + def check_parset(obs_subtask): # todo: check that these values make sense! '''helper function to check the parset for UC1 target/calibrator observations''' obs_parset = parameterset.fromString(self.tmss_client.get_subtask_parset(obs_subtask['id'])).dict() @@ -406,18 +406,16 @@ class TestObservationStrategiesSpecificationAndScheduling(unittest.TestCase): self.assertEqual('LBA_SPARSE_EVEN', obs_parset['Observation.antennaSet']) self.assertEqual('LBA_30_90', obs_parset['Observation.bandFilter']) self.assertEqual(1, int(obs_parset['Observation.nrAnaBeams'])) - self.assertEqual(2 if is_target_obs else 4, int(obs_parset['Observation.nrBeams'])) # todo: why 4? Bug? + self.assertEqual(4, int(obs_parset['Observation.nrBeams'])) self.assertEqual('Observation', obs_parset['Observation.processType']) self.assertEqual('Beam Observation', obs_parset['Observation.processSubtype']) self.assertEqual(parser.parse(obs_subtask['start_time']), parser.parse(obs_parset['Observation.startTime'])) self.assertEqual(parser.parse(obs_subtask['stop_time']), parser.parse(obs_parset['Observation.stopTime'])) self.assertEqual(200, int(obs_parset['Observation.sampleClock'])) self.assertEqual(122, len(obs_parset['Observation.Beam[0].subbandList'].split(','))) - if is_target_obs: - self.assertEqual(244, len(obs_parset['Observation.Beam[1].subbandList'].split(','))) self.assertEqual(True, strtobool(obs_parset['Observation.DataProducts.Output_Correlated.enabled'])) - self.assertEqual(488 if is_target_obs else 488, len(obs_parset['Observation.DataProducts.Output_Correlated.filenames'].split(','))) - self.assertEqual(488 if is_target_obs else 488, len(obs_parset['Observation.DataProducts.Output_Correlated.locations'].split(','))) + self.assertEqual(488, len(obs_parset['Observation.DataProducts.Output_Correlated.filenames'].split(','))) + self.assertEqual(488, len(obs_parset['Observation.DataProducts.Output_Correlated.locations'].split(','))) self.assertEqual(False, strtobool(obs_parset.get('Observation.DataProducts.Output_CoherentStokes.enabled', 'false'))) self.assertEqual(False, strtobool(obs_parset.get('Observation.DataProducts.Output_IncoherentStokes.enabled', 'false'))) self.assertEqual(False, strtobool(obs_parset.get('Observation.DataProducts.Output_Pulsar.enabled', 'false'))) @@ -458,54 +456,44 @@ class TestObservationStrategiesSpecificationAndScheduling(unittest.TestCase): # check the tasks tasks = scheduling_unit_blueprint_ext['task_blueprints'] - self.assertEqual(7, len(tasks)) + self.assertEqual(6, len(tasks)) observation_tasks = [t for t in tasks if t['task_type'] == 'observation'] - self.assertEqual(2, len(observation_tasks)) + self.assertEqual(1, len(observation_tasks)) pipeline_tasks = [t for t in tasks if t['task_type'] == 'pipeline'] self.assertEqual(4, len(pipeline_tasks)) self.assertEqual(1, len([t for t in tasks if t['task_type'] == 'ingest'])) ingest_task = next(t for t in tasks if t['task_type'] == 'ingest') - cal_obs_task = next(t for t in observation_tasks if t['name'] == 'Calibrator Observation') - target_obs_task = next(t for t in observation_tasks if t['name'] == 'Target Observation') + obs_task = next(t for t in observation_tasks if t['name'] == 'Combined Observation') # ------------------- - # schedule simultaneous calibrator and target obs (two tasks -> one subtask) - self.assertEqual(1, len([st for st in cal_obs_task['subtasks'] if st['subtask_type'] == 'observation'])) - cal_obs_subtask = next(st for st in cal_obs_task['subtasks'] if st['subtask_type'] == 'observation') - - self.assertEqual(1, len([st for st in target_obs_task['subtasks'] if st['subtask_type'] == 'observation'])) - target_obs_subtask = next(st for st in target_obs_task['subtasks'] if st['subtask_type'] == 'observation') - - self.assertEqual(cal_obs_subtask['id'], target_obs_subtask['id']) - cal_obs_subtask = self.tmss_client.schedule_subtask(cal_obs_subtask['id']) + # schedule combined calibrator and target obs subtask + self.assertEqual(1, len([st for st in obs_task['subtasks'] if st['subtask_type'] == 'observation'])) + obs_subtask = next(st for st in obs_task['subtasks'] if st['subtask_type'] == 'observation') + obs_subtask = self.tmss_client.schedule_subtask(obs_subtask['id']) - check_parset(cal_obs_subtask, is_target_obs=False) - self.check_statuses(cal_obs_subtask['id'], "scheduled", "scheduled", "scheduled") + check_parset(obs_subtask) + self.check_statuses(obs_subtask['id'], "scheduled", "scheduled", "scheduled") # check output_dataproducts - cal_obs_output_dataproducts = self.tmss_client.get_subtask_output_dataproducts(cal_obs_subtask['id']) - self.assertEqual(488, len(cal_obs_output_dataproducts)) + obs_output_dataproducts = self.tmss_client.get_subtask_output_dataproducts(obs_subtask['id']) + self.assertEqual(488, len(obs_output_dataproducts)) # "mimic" that the cal_obs_subtask starts running - set_subtask_state_following_allowed_transitions(cal_obs_subtask['id'], 'started') - self.check_statuses(cal_obs_subtask['id'], "started", "started", "observing") + set_subtask_state_following_allowed_transitions(obs_subtask['id'], 'started') + self.check_statuses(obs_subtask['id'], "started", "started", "observing") - # "mimic" that the cal_obs_subtask finished (including qa subtasks) - for subtask in cal_obs_task['subtasks']: + # "mimic" that all obs_task subtask finished (including qa subtasks) + for subtask in obs_task['subtasks']: set_subtask_state_following_allowed_transitions(subtask['id'], 'finished') - # todo: check_statuses does not allow for different statuses of multiple related tasks, so we cannot do - # self.check_statuses(cal_obs_subtask['id'], "finished", "finished", "observed") - # but have to assert here directly. (Why are the statuses for target and calibrator tasks inconsistent here?) - cal_obs_subtask = self.tmss_client.get_subtask(cal_obs_subtask['id']) - self.assertEqual("finished", cal_obs_subtask['state_value']) - cal_obs_task = self.tmss_client.get_url_as_json_object(cal_obs_task['url']) - self.assertEqual("finished", cal_obs_task['status']) - target_obs_task = self.tmss_client.get_url_as_json_object(target_obs_task['url']) - self.assertEqual("observed", target_obs_task['status']) - schedunit = self.tmss_client.get_url_as_json_object(scheduling_unit_blueprint['url']) - self.assertEqual("observed", schedunit['status']) + # refresh all subtasks/tasks/schedunits for latest values + obs_subtask = self.tmss_client.get_subtask(obs_subtask['id']) + self.assertEqual("finished", obs_subtask['state_value']) + obs_task = self.tmss_client.get_url_as_json_object(obs_task['url']) + self.assertEqual("finished", obs_task['status']) + scheduling_unit_blueprint = self.tmss_client.get_url_as_json_object(scheduling_unit_blueprint['url']) + self.assertEqual("observed", scheduling_unit_blueprint['status']) # ------------------- # check pipelines @@ -597,7 +585,7 @@ class TestObservationStrategiesSpecificationAndScheduling(unittest.TestCase): # "mimic" that the target_pipe3_subtask finished set_subtask_state_following_allowed_transitions(target_pipe3_subtask['id'], 'finished') - self.check_statuses(target_pipe3_subtask['id'], "finished", "finished", "processing") + self.check_statuses(target_pipe3_subtask['id'], "finished", "finished", "processed") logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index eb45bcb9232ec1fe75a762e60c68d528609f83af..92d2f27e7d4a8f7540899d2426acbaace81fc569 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -650,64 +650,6 @@ class SubtaskInputOutputTest(unittest.TestCase): self.assertEqual(set(pipe_in1.dataproducts.all()), {dp1_1, dp1_3}) self.assertEqual(set(pipe_in2.dataproducts.all()), {dp2_2}) - @mock.patch("lofar.sas.tmss.tmss.tmssapp.subtasks.assign_or_unassign_resources") - def test_combined_target_calibrator_subtask_connects_dataproducts_to_correct_output(self, assign_resources_mock): - """ - Create a subtask that combines a target and parallel calibrator observation. - Schedule the subtask and assert that dataproducts are assigned to both outputs. - """ - - # setup tasks - cal_task_template = models.TaskTemplate.objects.get(name="calibrator observation") - cal_task_spec = get_default_json_object_for_schema(cal_task_template.schema) - - cal_task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(specifications_template=cal_task_template, specifications_doc=cal_task_spec)) - cal_task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=cal_task_draft)) - - target_task_template = models.TaskTemplate.objects.get(name="target observation") - target_task_spec = get_default_json_object_for_schema(target_task_template.schema) - target_task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(specifications_template=target_task_template, specifications_doc=target_task_spec)) - target_task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=target_task_draft, - scheduling_unit_blueprint=cal_task_blueprint.scheduling_unit_blueprint)) - - models.TaskSchedulingRelationBlueprint.objects.create(first=cal_task_blueprint, second=target_task_blueprint, - placement=models.SchedulingRelationPlacement.objects.get(value='parallel')) - - # specify two beams with known number of subbands - target_task_blueprint.specifications_doc['SAPs'] = [{'name': 'target1_combined', 'target': 'target1_combined', 'subbands': [0, 1], - 'digital_pointing': {'angle1': 0.1, 'angle2': 0.1, - 'direction_type': 'J2000'}}, - {'name': 'target2_combined', 'target': 'target1_combined', 'subbands': [2, 3, 4], - 'digital_pointing': {'angle1': 0.1, 'angle2': 0.1, - 'direction_type': 'J2000'}} - ] - target_task_blueprint.save() - cal_task_blueprint.specifications_doc['name'] = "calibrator_combined" - cal_task_blueprint.save() - - # create subtask - create_observation_control_subtask_from_task_blueprint(target_task_blueprint) - subtask = create_observation_control_subtask_from_task_blueprint(cal_task_blueprint) - subtask.start_time = datetime.utcnow() - subtask.stop_time = datetime.utcnow() - subtask.save() - - # assert no dataproducts are connected before scheduling - target_output = subtask.outputs.filter(task_blueprint=target_task_blueprint).first() - cal_output = subtask.outputs.filter(task_blueprint=cal_task_blueprint).first() - self.assertEqual(target_output.dataproducts.count(), 0) - self.assertEqual(cal_output.dataproducts.count(), 0) - - # schedule, and assert subtask state - self.assertEqual('defined', subtask.state.value) - schedule_observation_subtask(subtask) - self.assertEqual('scheduled', subtask.state.value) - - # assert dataproducts are connected to both outputs after scheduling - # task and calibrator tasks should each have associated one dataproduct per subband of the target task - self.assertEqual(target_output.dataproducts.count(), 5) - self.assertEqual(cal_output.dataproducts.count(), 5) - class SAPTest(unittest.TestCase): """