diff --git a/SAS/ResourceAssignment/TaskPrescheduler/lib/cobaltblocksize.py b/SAS/ResourceAssignment/TaskPrescheduler/lib/cobaltblocksize.py index daa5266fc31381ea20a84d6200d696383b0608e9..ac14727d9a2c2645de608bf7454bd9bf60e30175 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/lib/cobaltblocksize.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/lib/cobaltblocksize.py @@ -47,7 +47,7 @@ class BlockConstraints(object): """ Provide the constraints for the block size, as derived from the correlator and beamformer settings. """ - def __init__(self, correlatorSettings=None, coherentStokesSettings=None, incoherentStokesSettings=None, clockMHz=200): + def __init__(self, correlatorSettings=None, coherentStokesSettings=[], incoherentStokesSettings=[], clockMHz=200): self.correlator = correlatorSettings self.coherentStokes = coherentStokesSettings self.incoherentStokes = incoherentStokesSettings @@ -107,28 +107,28 @@ class BlockConstraints(object): # Correlator.cu (minimum of 16 samples per channel) factor = lcm(factor, CORRELATOR_BLOCKSIZE * self.correlator.nrChannelsPerSubband * self.nrSubblocks()) - if self.coherentStokes: + for coherentStokes in self.coherentStokes: # DelayAndBandPass.cu factor = lcm(factor, BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE * BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS) # FIR_Filter.cu - factor = lcm(factor, NR_PPF_TAPS * self.coherentStokes.nrChannelsPerSubband) + factor = lcm(factor, NR_PPF_TAPS * coherentStokes.nrChannelsPerSubband) # CoherentStokesKernel.cc - factor = lcm(factor, MAX_THREADS_PER_BLOCK * self.coherentStokes.timeIntegrationFactor) + factor = lcm(factor, MAX_THREADS_PER_BLOCK * coherentStokes.timeIntegrationFactor) #CoherentStokes.cu (integration should fit) - factor = lcm(factor, 1024 * self.coherentStokes.timeIntegrationFactor * self.coherentStokes.nrChannelsPerSubband) + factor = lcm(factor, 1024 * coherentStokes.timeIntegrationFactor * coherentStokes.nrChannelsPerSubband) - if self.incoherentStokes: + for incoherentStokes in self.incoherentStokes: # DelayAndBandPass.cu factor = lcm(factor, BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE * BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS) # FIR_Filter.cu - factor = lcm(factor, NR_PPF_TAPS * self.incoherentStokes.nrChannelsPerSubband) + factor = lcm(factor, NR_PPF_TAPS * incoherentStokes.nrChannelsPerSubband) # IncoherentStokes.cu (integration should fit) - factor = lcm(factor, 1024 * self.incoherentStokes.timeIntegrationFactor * self.incoherentStokes.nrChannelsPerSubband) + factor = lcm(factor, 1024 * incoherentStokes.timeIntegrationFactor * incoherentStokes.nrChannelsPerSubband) return factor diff --git a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py index e3cf4e6ccc1730279de43c26cb2617b56709e09a..5cf07d6b85ab9866355c1a352df47d1a3697e1ab 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py @@ -69,7 +69,7 @@ def calculateCobaltSettings(spec): incoherent = None clock = parset["Observation.sampleClock"] - constraints = BlockConstraints(corr, coherent, incoherent, clock) + constraints = BlockConstraints(corr, [coherent], [incoherent], clock) calculator = BlockSize(constraints) return {'nrSubblocks': calculator.nrSubblocks, 'blockSize': calculator.blockSize, diff --git a/SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.py b/SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.py index fe7acef4cf0ab8d1c2fb3baa6938b2eeacfa7e1b..8eaec011e3fd642723377b9ace171db8a687dfd1 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.py @@ -56,7 +56,7 @@ class TestBlockConstraints(unittest.TestCase): coh.nrChannelsPerSubband = 16 coh.timeIntegrationFactor = 4 - c = BlockConstraints(coherentStokesSettings=coh) + c = BlockConstraints(coherentStokesSettings=[coh]) self.assertEqual(c.nrSubblocks(), 1) self.assertGreaterEqual(c.factor(), 1) @@ -69,7 +69,7 @@ class TestBlockConstraints(unittest.TestCase): incoh.nrChannelsPerSubband = 16 incoh.timeIntegrationFactor = 4 - c = BlockConstraints(incoherentStokesSettings=incoh) + c = BlockConstraints(incoherentStokesSettings=[incoh]) self.assertEqual(c.nrSubblocks(), 1) self.assertGreaterEqual(c.factor(), 1) @@ -94,7 +94,7 @@ class TestBlockSize(unittest.TestCase): correlator.nrChannelsPerSubband = 64 correlator.integrationTime = integrationTime - c = BlockConstraints( correlator, None, None ) + c = BlockConstraints(correlator) bs = BlockSize(c) self.assertAlmostEquals(c._samples2time(bs.integrationSamples), integrationTime, delta = integrationTime * 0.05) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index a4fd63788ffff44af3696a8b2c3e4be9999e4d49..68ed8f2dd2893a96bd4e678935fcf0b60cdec4ea 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -107,15 +107,20 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d parset[beam_prefix+"Correlator.angle2"] = phase_center['pointing']['angle2'] - # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work - subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) - subtask_output_ids = [o.id for o in subtask_outputs] + dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype__value=Datatype.Choices.VISIBILITIES.value).order_by('filename')) - # TODO: don't assume ordering by filename is sufficient: we need to inspect the dataproduct properties to make sure saps and subbands are in the correct order - dataproducts = list(models.Dataproduct.objects.filter(producer_id__in=subtask_output_ids).filter(dataformat=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype=Datatype.Choices.VISIBILITIES).order_by('filename')) + # marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled. + correlator_dataproducts = [] + for digi_beam in digi_beams: + for subband in digi_beam["subbands"]: + dataproduct = [dp for dp in dataproducts + if dp.specifications_doc["sap"] == digi_beam['name'] + and dp.specifications_doc["subband"] == subband] - parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in dataproducts] - parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in dataproducts] + correlator_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) + + parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in correlator_dataproducts] + parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in correlator_dataproducts] # mimic MoM placeholder thingy (the resource estimator parses this) parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] @@ -129,12 +134,8 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d parset = {} - # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work - subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) - subtask_output_ids = [o.id for o in subtask_outputs] - # TODO: don't assume ordering by filename is sufficient: we need to inspect the dataproduct properties to make sure saps and subbands are in the correct order - dataproducts = list(models.Dataproduct.objects.filter(producer_id__in=subtask_output_ids).filter(dataformat=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype=Datatype.Choices.TIME_SERIES.value).order_by('filename')) + dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.BEAMFORMED.value).filter(datatype__value=Datatype.Choices.TIME_SERIES.value).order_by('filename')) # Lists of coherent and incoherent dataproducts that will be produced, in the order COBALT wants them coherent_dataproducts = [] @@ -174,11 +175,17 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d # marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled. for s in range(nr_stokes): for p in range(nr_parts): - # TODO: don't assume ordering by filename is sufficient: we need to inspect the dataproduct properties to make sure saps and subbands are in the correct order + dataproduct = [dp for dp in dataproducts + if dp.specifications_doc["sap"] == sap['name'] + and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx + and dp.specifications_doc["identifiers"]["tab_index"] == field_idx + and dp.specifications_doc["identifiers"]["stokes_index"] == s + and dp.specifications_doc["identifiers"]["part_index"] == p + and dp.specifications_doc["coherent"] == tab['coherent']] if tab['coherent']: - coherent_dataproducts.append(dataproducts.pop(0) if dataproducts else null_dataproduct) + coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) else: - incoherent_dataproducts.append(dataproducts.pop(0) if dataproducts else null_dataproduct) + incoherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) if cobalt_version >= 2: pipeline_parset['Beam[%s].subbandList' % sap_idx] = sap['subbands'] @@ -192,7 +199,8 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d beamformer_pipeline_parsets.append(pipeline_parset) # Process fly's eye pipelines - for pipeline in spec['COBALT']['beamformer']['flyseye_pipelines']: + pipeline_idx_offset = len(beamformer_pipeline_parsets) + for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_idx_offset): pipeline_parset = {} pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['coherent']), "CoherentStokes.")) pipeline_parset['flysEye'] = True @@ -206,7 +214,7 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d antennaset = spec['stations']['antenna_set'] fields = sum([list(antenna_fields(station, antennaset)) for station in stations], []) - for field in fields: + for field_idx, field in enumerate(fields): stokes_settings = pipeline['coherent'] nr_subbands = len(sap['subbands']) @@ -216,8 +224,14 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d # marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled. for s in range(nr_stokes): for p in range(nr_parts): - # TODO: don't assume ordering by filename is sufficient: we need to inspect the dataproduct properties to make sure saps and subbands are in the correct order - coherent_dataproducts.append(dataproducts.pop(0) if dataproducts else null_dataproduct) + dataproduct = [dp for dp in dataproducts + if dp.specifications_doc["sap"] == sap["name"] + and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx + and dp.specifications_doc["identifiers"]["tab_index"] == field_idx + and dp.specifications_doc["identifiers"]["stokes_index"] == s + and dp.specifications_doc["identifiers"]["part_index"] == p + and dp.specifications_doc["coherent"] == True] + coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) if cobalt_version >= 2: pipeline_parset['Beam[%s].stationList' % sap_idx] = pipeline['stations'] @@ -519,9 +533,15 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) - out_dataproducts = [] - for subtask_output in subtask_outputs: - out_dataproducts = list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) + unsorted_out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs],[]) + + def find_dataproduct(dataproducts: list, specification_doc: dict): + hits = [dp for dp in dataproducts if dp.specifications_doc['sap'] == specification_doc['sap'] + and dp.specifications_doc['subband'] == specification_doc['subband']] + return hits[0] if hits else null_dataproduct + + # list output dataproducts in the same order as input dataproducts, matched by the identifiers + out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc) for in_dp in in_dataproducts] parset["Observation.DataProducts.Output_Correlated.enabled"] = "true" parset["Observation.DataProducts.Output_Correlated.filenames"] = "[%s]" % ",".join([dp.filename for dp in out_dataproducts]) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json new file mode 100644 index 0000000000000000000000000000000000000000..d11ec11cc085263e455984410ad0f4e3dcc8e5ca --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json @@ -0,0 +1,71 @@ +{ + "$id":"http://tmss.lofar.org/api/schemas/dataproductspecificationtemplate/timeseries/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "timeseries", + "type": "object", + "default": {}, + "properties": { + "sap": { + "type": "string", + "title": "SAP", + "default": "" + }, + "identifiers": { + "title": "Identifiers", + "description": "Identification of this dataproduct within the producing subtask.", + "type": "object", + "default": {}, + "properties": { + "sap_index": { + "title": "SAP index", + "type": "integer", + "default": 0, + "minimum": 0 + }, + "pipeline_index": { + "title": "TAB index", + "description": "Index of beamformer pipeline within COBALT", + "type": "integer", + "default": 0, + "minimum": 0 + }, + "tab_index": { + "title": "TAB index", + "description": "TAB index within the SAP", + "type": "integer", + "default": 0, + "minimum": 0 + }, + "part_index": { + "title": "Part index", + "description": "Part index within the TAB", + "type": "integer", + "default": 0, + "minimum": 0 + }, + "stokes_index": { + "title": "Stokes index", + "description": "Stokes index within the TAB", + "type": "integer", + "default": 0, + "minimum": 0, + "maximum": 3 + }, + "coherent": { + "title": "Coherent", + "description": "TAB is a coherent addition", + "type": "boolean", + "default": true + } + }, + "required": [ + "sap_index", + "tab_index", + "part_index", + "stokes_index", + "coherent" + ] + } + }, + "required": [ "identifiers" ] +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json new file mode 100644 index 0000000000000000000000000000000000000000..161f96803940afef59c4ceaf35787ad6012f5e66 --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json @@ -0,0 +1,22 @@ +{ + "$id":"http://tmss.lofar.org/api/schemas/dataproductspecificationstemplate/visibilities/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "visibilities", + "type": "object", + "default": {}, + "properties": { + "sap": { + "type": "string", + "title": "SAP", + "default": "" + }, + "subband": { + "type": "integer", + "title": "subband number", + "default": 0, + "minimum": 0, + "maximum": 511 + } + }, + "required": [ "sap", "subband" ] +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-observation-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-observation-1.json index 985274ec00ccab31533717ae489dee21ad4a6b14..3555487e83beaf29a2c66bab6f7327c4cf6cee99 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-observation-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-observation-1.json @@ -98,7 +98,7 @@ "type":"integer", "title":"Specification version", "description":"Version of the COBALT specification to emit", - "default":2, + "default":1, "minimum":1, "maximum":2 }, diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json index 480d7a4abb715673befa1742ef8fedb6ac04a00f..1f81d5e5cbe42fa1f8255e757b441ea0f1adc8d4 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json @@ -35,6 +35,14 @@ "file_name": "dataproduct_specifications_template-empty-1.json", "template": "dataproduct_specifications_template" }, + { + "file_name": "dataproduct_specifications_template-timeseries-1.json", + "template": "dataproduct_specifications_template" + }, + { + "file_name": "dataproduct_specifications_template-visibilities-1.json", + "template": "dataproduct_specifications_template" + }, { "file_name": "dataproduct_feedback_template-empty-1.json", "template": "dataproduct_feedback_template" diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 60499017cdc4a4247f7716881ca2840f45ffb96a..856c523be56c5a471099ab484f6eb04412b678a8 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -4,12 +4,14 @@ logger = logging.getLogger(__name__) from copy import deepcopy from functools import cmp_to_key from collections.abc import Iterable +from math import ceil from lofar.common.ring_coordinates import RingCoordinates from lofar.common.datetimeutils import formatDatetime, round_to_second_precision from lofar.common import isProductionEnvironment from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema from lofar.common.lcu_utils import get_current_stations +from lofar.stationmodel.antennafields import antenna_fields from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException, SubtaskException @@ -20,7 +22,7 @@ from lofar.sas.tmss.tmss.tmssapp.models import * from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset_dict -from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, BlockConstraints, BlockSize +from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize from lofar.sas.resourceassignment.resourceassigner.schedulers import ScheduleException from lofar.sas.tmss.tmss.tmssapp.conversions import antennafields_for_antennaset_and_station @@ -73,8 +75,9 @@ def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subta task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.type.value, task_blueprint.scheduling_unit_blueprint.id) subtasks.append(subtask) - except SubtaskCreationException as e: - logger.error(e) + except Exception as e: + logger.exception(e) + raise SubtaskCreationException('Cannot create subtasks for task id=%s for its schema name=%s in generator %s' % (task_blueprint.pk, template_name, generator)) from e return subtasks else: logger.error('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name)) @@ -152,6 +155,9 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta # now go over the settings in the task_spec and 'copy'/'convert' them to the subtask_spec task_spec = task_blueprint.specifications_doc + # block size calculator will need to be fed all the relevant specs + cobalt_calculator_constraints = BlockConstraints(None, [], []) + # The calibrator has a minimal calibration-specific specification subset. # The rest of it's specs are 'shared' with the target observation. # So... copy the calibrator specs first, then loop over the shared target/calibrator specs... @@ -195,6 +201,17 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta logger.info("Using station and correlator settings for calibrator observation task_blueprint id=%s from target observation task_blueprint id=%s", task_blueprint.id, target_task_blueprint.id) + # correlator + subtask_spec["COBALT"]["correlator"] = { "enabled": False } + + if "correlator" in task_spec: + subtask_spec["COBALT"]["correlator"]["enabled"] = True + subtask_spec["COBALT"]["correlator"]["channels_per_subband"] = task_spec["correlator"]["channels_per_subband"] + + corr = CorrelatorSettings() + corr.nrChannelsPerSubband = task_spec["correlator"]["channels_per_subband"] + corr.integrationTime = task_spec["correlator"]["integration_time"] + cobalt_calculator_constraints.correlator = corr # At this moment of subtask creation we known which stations we *want* from the task_spec # But we do not know yet which stations are available at the moment of observing. @@ -215,70 +232,89 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta # The beamformer obs has a beamformer-specific specification block. # The rest of it's specs is the same as in a target observation. # So... copy the beamformer specs first, then loop over the shared specs... - if 'beamforming' in task_blueprint.specifications_template.name.lower(): + if 'beamformers' in task_spec: subtask_spec['COBALT']['beamformer']['tab_pipelines'] = [] subtask_spec['COBALT']['beamformer']['flyseye_pipelines'] = [] - if 'beamformers' in task_spec: - for task_beamformer_spec in task_spec['beamformers']: - task_beamformer_spec = deepcopy(task_beamformer_spec) - - # the wanted/specified beamformer station list is the intersecion of the observation station list with the requested beamformer stations. - # at the moment of scheduling this list is re-evaluated for available stations, and the max_nr_missing is evaluated as well. - # this intersection is not needed per se, because COBALT plays nicely and does similar filtering for stations that are actually available, - # but hey, if cobalt can play nice, then so can we! :) - # So, let's come up with the correct complete beamforming-stations-list, and ask cobalt to explicitely uses these. - beamformer_station_list = [] - if "station_groups" in task_beamformer_spec: - # combine all stations in the groups... - for station_group in task_beamformer_spec["station_groups"]: - beamformer_station_list.extend(station_group["stations"]) - - # make intersection with observing-stations... - beamformer_station_set = set(beamformer_station_list).intersection(set(subtask_spec['stations']['station_list'])) - - # make it a nice readable sorted list. - beamformer_station_list = sorted(list(beamformer_station_list)) - # use the beamformer_station_list below for the tab pipeline and/or flys eye - - for stokes_type in ["coherent", "incoherent"]: - if stokes_type in task_beamformer_spec: - # SAPs - saps = task_beamformer_spec[stokes_type]["SAPs"] - for sap in saps: - # determine absolute tab pointing for subtask by adding relative tab pointing from task to target sap pointing - target_sap = _get_related_target_sap_by_name(task_blueprint, sap['name']) - if "tabs" in sap: - for tab in sap["tabs"]: - tab['coherent'] = (stokes_type == "coherent") - if "relative" in tab: - if tab.pop("relative"): - tab['pointing'] = _add_pointings(tab['pointing'], target_sap['digital_pointing']) - elif stokes_type == 'incoherent': - sap.setdefault('tabs', []) - sap["tabs"] += [{'coherent': False}] # todo: according to confluence. Is that needed? - if "tab_rings" in sap: - ring_pointings = _generate_tab_ring_pointings(target_sap["digital_pointing"], sap.pop("tab_rings")) - sap['tabs'] += [{'coherent': (stokes_type == "coherent"), 'pointing': pointing} for pointing in ring_pointings] - if "subbands" in sap: - sap['subbands'] = _filter_subbands(target_sap['subbands'], sap['subbands']) - - # create a pipeline item and add it to the list - beamformer_pipeline = {stokes_type: task_beamformer_spec[stokes_type]["settings"], - "stations": beamformer_station_list, - "SAPs": saps} - subtask_spec['COBALT']['beamformer']['tab_pipelines'].append(beamformer_pipeline) - if task_beamformer_spec['flys eye'].get("enabled", False): - flyseye_pipeline = {"coherent": task_beamformer_spec["flys eye"]["settings"], - "stations": beamformer_station_list} - subtask_spec['COBALT']['beamformer']['flyseye_pipelines'].append(flyseye_pipeline) - # todo: Clarify if we can add a subbands_selection on the flys eye task spec, to filter down for sap['subbands'] - # If I got that correctly, specifying subbands is not really supported later down the chain, so whatever we do here gets ignored anyway? - # for sap in task_spec["SAPs"]: - # target_sap = _get_related_target_sap_by_name(task_blueprint, sap['name']) - # sap['subbands'] = filter_subbands(...) - # if sap['subbands'] == target_sap['subbands']: # todo: is this really required? pseudo-code in confluence suggests so, but what harm does the list do? - # sap['subbands'] = [] + for task_beamformer_spec in task_spec['beamformers']: + # the wanted/specified beamformer station list is the intersecion of the observation station list with the requested beamformer stations. + # at the moment of scheduling this list is re-evaluated for available stations, and the max_nr_missing is evaluated as well. + # this intersection is not needed per se, because COBALT plays nicely and does similar filtering for stations that are actually available, + # but hey, if cobalt can play nice, then so can we! :) + # So, let's come up with the correct complete beamforming-stations-list, and ask cobalt to explicitely uses these. + + # combine all stations in the groups... + beamformer_station_list = sum([station_group["stations"] for station_group in task_beamformer_spec["station_groups"]], []) + + # make intersection with observing-stations... + beamformer_station_set = set(beamformer_station_list).intersection(set(subtask_spec['stations']['station_list'])) + + # make it a nice readable sorted list. + beamformer_station_list = sorted(list(beamformer_station_list)) + # use the beamformer_station_list below for the tab pipeline and/or flys eye + + for stokes_type in ["coherent", "incoherent"]: + if not task_beamformer_spec[stokes_type]["SAPs"]: + # nothing specified for this stokes type + continue + + # SAPs + subtask_saps = [] + for sap in task_beamformer_spec[stokes_type]["SAPs"]: + subtask_sap = { "name": sap["name"], "tabs": [] } + + target_sap = _get_related_target_sap_by_name(task_blueprint, sap['name']) + if stokes_type == "coherent": + for tab in sap["tabs"]: + subtask_sap["tabs"].append({ + "coherent": True, + # determine absolute tab pointing for subtask by adding relative tab pointing from task to target sap pointing + "pointing": tab["pointing"] if not tab.get("relative", False) else _add_pointings(tab['pointing'], target_sap['digital_pointing']) + }) + + if "tab_rings" in sap: + ring_pointings = _generate_tab_ring_pointings(target_sap["digital_pointing"], sap.pop("tab_rings")) + subtask_sap['tabs'] += [{'coherent': True, 'pointing': pointing} for pointing in ring_pointings] + else: + subtask_sap["tabs"] = [{"coherent": False}] + + if "subbands" in sap: + sap['subbands'] = _filter_subbands(target_sap['subbands'], sap['subbands']) + + subtask_saps.append(subtask_sap) + + # create a pipeline item and add it to the list + beamformer_pipeline = {stokes_type: task_beamformer_spec[stokes_type]["settings"], + "stations": beamformer_station_list, + "SAPs": subtask_saps} + subtask_spec['COBALT']['beamformer']['tab_pipelines'].append(beamformer_pipeline) + + # add constraints for calculator + ss = StokesSettings() + ss.nrChannelsPerSubband = task_beamformer_spec[stokes_type]["settings"]["channels_per_subband"] + ss.timeIntegrationFactor = task_beamformer_spec[stokes_type]["settings"]["time_integration_factor"] + if stokes_type == "coherent": + cobalt_calculator_constraints.coherentStokes.append(ss) + else: + cobalt_calculator_constraints.incoherentStokes.append(ss) + + if task_beamformer_spec['flys eye']['enabled']: + # add constraints for calculator + ss = StokesSettings() + ss.nrChannelsPerSubband = task_beamformer_spec["flys eye"]["settings"]["channels_per_subband"] + ss.timeIntegrationFactor = task_beamformer_spec["flys eye"]["settings"]["time_integration_factor"] + cobalt_calculator_constraints.coherentStokes.append(ss) + + flyseye_pipeline = {"coherent": task_beamformer_spec["flys eye"]["settings"], + "stations": beamformer_station_list} + subtask_spec['COBALT']['beamformer']['flyseye_pipelines'].append(flyseye_pipeline) + # todo: Clarify if we can add a subbands_selection on the flys eye task spec, to filter down for sap['subbands'] + # If I got that correctly, specifying subbands is not really supported later down the chain, so whatever we do here gets ignored anyway? + # for sap in task_spec["SAPs"]: + # target_sap = _get_related_target_sap_by_name(task_blueprint, sap['name']) + # sap['subbands'] = filter_subbands(...) + # if sap['subbands'] == target_sap['subbands']: # todo: is this really required? pseudo-code in confluence suggests so, but what harm does the list do? + # sap['subbands'] = [] subtask_spec['stations']["antenna_set"] = task_spec["antenna_set"] subtask_spec['stations']["filter"] = task_spec["filter"] @@ -301,15 +337,15 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta "angle1": task_spec["tile_beam"]["angle1"], "angle2": task_spec["tile_beam"]["angle2"] } + + + # Calculate block sizes and feed those to the spec + cobalt_calculator = BlockSize(constraints=cobalt_calculator_constraints) + subtask_spec["COBALT"]["blocksize"] = cobalt_calculator.blockSize + if "correlator" in task_spec: - corr = CorrelatorSettings() - corr.nrChannelsPerSubband = task_spec["correlator"]["channels_per_subband"] - corr.integrationTime = task_spec["correlator"]["integration_time"] - calculator = BlockSize(constraints=BlockConstraints(correlatorSettings=corr)) - subtask_spec["COBALT"]["correlator"] = {} - subtask_spec["COBALT"]["correlator"]["enabled"] = True - subtask_spec["COBALT"]["correlator"]["blocks_per_integration"] = calculator.nrBlocks - subtask_spec["COBALT"]["correlator"]["integrations_per_block"] = calculator.nrSubblocks + subtask_spec["COBALT"]["correlator"]["blocks_per_integration"] = cobalt_calculator.nrBlocks + subtask_spec["COBALT"]["correlator"]["integrations_per_block"] = cobalt_calculator.nrSubblocks # make sure that the subtask_spec is valid conform the schema validate_json_against_schema(subtask_spec, subtask_template.schema) @@ -472,6 +508,18 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) def create_qaplots_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: + if 'calibrator' in task_blueprint.specifications_template.name.lower(): + # Calibrator requires related Target Task Observation for some specifications + target_task_blueprint = get_related_target_observation_task_blueprint(task_blueprint) + if target_task_blueprint is None: + raise SubtaskCreationException("Cannot retrieve specifications for task id=%d because no related target observation is found " % task.pk) + else: + target_task_blueprint = task_blueprint + + if not target_task_blueprint.specifications_doc.get("QA", {}).get("file_conversion", {}).get("enabled", False): + logger.debug("Skipping creation of qaplots_subtask because QA.file_conversion is not enabled") + return None + qafile_subtasks = [st for st in task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value] if qafile_subtasks: qafile_subtask = qafile_subtasks[0] # TODO: decide what to do when there are multiple qafile subtasks? @@ -673,7 +721,7 @@ def schedule_subtask(subtask: Subtask) -> Subtask: logger.error(e2) finally: # ... and re-raise the original exception (wrapped) - raise SubtaskSchedulingException("Error while scheduling subtask id=%d: %s" % (subtask.pk, str(e))) + raise SubtaskSchedulingException("Error while scheduling subtask id=%d" % (subtask.pk,)) from e def unschedule_subtask(subtask: Subtask) -> Subtask: @@ -1100,48 +1148,117 @@ def schedule_observation_subtask(observation_subtask: Subtask): # TODO: are there any observations that take input dataproducts? # step 3: create output dataproducts, and link these to the output + dataproducts = [] specifications_doc = observation_subtask.specifications_doc - dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP") # todo: should this be derived from the task relation specification template? dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first() - directory = "/data/%s/%s/L%s/uv" % ("projects" if isProductionEnvironment() else "test-projects", - observation_subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name, - observation_subtask.id) - - for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']): - antennaset = specifications_doc['stations']['antenna_set'] - antennafields = [] - for station in specifications_doc['stations']['station_list']: - fields = antennafields_for_antennaset_and_station(antennaset, station) - antennafields += [{"station": station, "field": field, "type": antennaset.split('_')[0]} for field in fields] - - sap = SAP.objects.create(specifications_doc={ "name": "%s_%s" % (observation_subtask.id, pointing['name']), - "pointing": pointing['pointing'], - "time": {"start_time": observation_subtask.start_time.isoformat(), - "duration": (observation_subtask.stop_time - observation_subtask.start_time).total_seconds()}, - "antennas": { + + # create SAP objects, as observations create new beams + antennaset = specifications_doc['stations']['antenna_set'] + antennafields = [] + for station in specifications_doc['stations']['station_list']: + fields = antennafields_for_antennaset_and_station(antennaset, station) + antennafields += [{"station": station, "field": field, "type": antennaset.split('_')[0]} for field in fields] + + saps = [SAP.objects.create(specifications_doc={ "name": "%s_%s" % (observation_subtask.id, pointing['name']), + "pointing": pointing['pointing'], + "time": {"start_time": observation_subtask.start_time.isoformat(), + "duration": (observation_subtask.stop_time - observation_subtask.start_time).total_seconds()}, + "antennas": { "antenna_set": antennaset, "fields": antennafields - } - }, - specifications_template=SAPTemplate.objects.get(name="SAP")) - - # create dataproducts in bulk, and assign each dp its own unique global identifier - dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in pointing['subbands']]) - Dataproduct.objects.bulk_create([Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr), - directory=directory, - dataformat=Dataformat.objects.get(value="MeasurementSet"), - datatype=Datatype.objects.get(value="visibilities"), - producer=subtask_output, - specifications_doc={"sap": [str(sap_nr)]}, - specifications_template=dataproduct_specifications_template, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), - feedback_template=dataproduct_feedback_template, - size=0 if sb_nr%10==0 else 1024*1024*1024*sb_nr, - expected_size=1024*1024*1024*sb_nr, - sap=sap, - global_identifier=dp_global_identifier) - for sb_nr, dp_global_identifier in zip(pointing['subbands'], dp_global_identifiers)]) + } + }, + specifications_template=SAPTemplate.objects.get(name="SAP")) for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings'])] + + # store everything below this directory + directory = "/data/%s/%s/L%s" % ("projects" if isProductionEnvironment() else "test-projects", + observation_subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name, + observation_subtask.id) + + # create correlated dataproducts + if specifications_doc['COBALT']['correlator']['enabled']: + dataproduct_specifications_template_visibilities = DataproductSpecificationsTemplate.objects.get(name="visibilities") + sb_nr_offset = 0 # subband numbers run from 0 to (nr_subbands-1), increasing across SAPs + + for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']): + for sb_nr, subband in enumerate(pointing['subbands'], start=sb_nr_offset): + dataproducts.append(Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr), + directory=directory+"/uv", + dataformat=Dataformat.objects.get(value="MeasurementSet"), + datatype=Datatype.objects.get(value="visibilities"), + producer=subtask_output, + specifications_doc={"sap": pointing["name"], "subband": subband}, + specifications_template=dataproduct_specifications_template_visibilities, + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_template=dataproduct_feedback_template, + size=0, + expected_size=1024*1024*1024*sb_nr, + sap=saps[sap_nr], + global_identifier=None)) + + sb_nr_offset += len(pointing['subbands']) + + + # create beamformer dataproducts + dataproduct_specifications_template_timeseries = DataproductSpecificationsTemplate.objects.get(name="timeseries") + + def _sap_index(saps: dict, sap_name: str) -> int: + """ Return the SAP index in the observation given a certain SAP name. """ + + sap_indices = [idx for idx,sap in enumerate(saps) if sap['name'] == sap_name] + + # needs to be exactly one hit + if len(sap_indices) != 1: + raise SubtaskSchedulingException("SAP name %s must appear exactly once in the specification. It appeared %d times. Available names: %s" % (sap_name, len(sap_indices), [sap['name'] for sap in saps])) + + return sap_indices[0] + + def tab_dataproducts(sap_nr, pipeline_nr, tab_nr, stokes_settings, coherent): + nr_subbands = len(sap['subbands']) or len(specifications_doc['stations']['digital_pointings'][sap_nr]['subbands']) + nr_stokes = len(stokes_settings['stokes']) + nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file']) + + return [Dataproduct(filename="L%d_SAP%03d_N%03d_B%03d_S%03d_P%03d_bf.h5" % (observation_subtask.id, sap_nr, pipeline_nr, tab_nr, stokes_nr, part_nr), + directory=directory+("/cs" if coherent else "/is"), + dataformat=Dataformat.objects.get(value="Beamformed"), + datatype=Datatype.objects.get(value="time series"), + producer=subtask_output, + specifications_doc={"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"], "coherent": coherent, "identifiers": {"pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr}}, + specifications_template=dataproduct_specifications_template_timeseries, + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_template=dataproduct_feedback_template, + size=0, + expected_size=1024*1024*1024*tab_nr, + sap=saps[sap_nr], + global_identifier=None) + for part_nr in range(nr_parts) for stokes_nr in range(nr_stokes)] + + + # beamformer pipelines: one set of dataproducts per TAB. + pipeline_nr_offset = 0 + for pipeline_nr, pipeline in enumerate(specifications_doc['COBALT']['beamformer']['tab_pipelines'], start=pipeline_nr_offset): + for sap in pipeline['SAPs']: + sap_idx = _sap_index(specifications_doc['stations']['digital_pointings'], sap['name']) + + for tab_idx, tab in enumerate(sap['tabs']): + dataproducts += tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'] if tab['coherent'] else pipeline['incoherent'], tab['coherent']) + + # fly's eye pipelines: one set of dataproducts per antenna field. + pipeline_nr_offset += len(specifications_doc['COBALT']['beamformer']['tab_pipelines']) + for pipeline_nr, pipeline in enumerate(specifications_doc['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_nr_offset): + for sap_idx, sap in enumerate(specifications_doc['stations']['digital_pointings']): + stations = pipeline['stations'] or specifications_doc['stations']['station_list'] + fields = sum([list(antenna_fields(station, antennaset)) for station in stations], []) + for tab_idx, tab in enumerate(fields): + dataproducts += tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'], True) + + # Bulk create identifiers, and then update the dataproducts with a link to the actual created objects. + # This is needed as bulk_create needs to have any relations resolved. + dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in dataproducts]) + for dp, global_identifier in zip(dataproducts, dp_global_identifiers): + dp.global_identifier = global_identifier + Dataproduct.objects.bulk_create(dataproducts) # step 4: resource assigner (if possible) assign_or_unassign_resources(observation_subtask) @@ -1194,7 +1311,7 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): pipeline_subtask.specifications_template.type)) # TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty" - dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="empty") + dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="visibilities") dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") # iterate over all inputs @@ -1225,7 +1342,7 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): dataformat=dataformat, datatype=Datatype.objects.get(value="visibilities"), # todo: is this correct? producer=pipeline_subtask_output, - specifications_doc=get_default_json_object_for_schema(dataproduct_specifications_template.schema), + specifications_doc=input_dp.specifications_doc, specifications_template=dataproduct_specifications_template, feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), feedback_template=dataproduct_feedback_template, @@ -1485,10 +1602,13 @@ def specifications_doc_meets_selection_doc(specifications_doc, selection_doc): meets_criteria = False else: spec = specifications_doc[k] - if isinstance(spec, Iterable) and isinstance(v, Iterable): + if isinstance(spec, list) and isinstance(v, list): for spec_v in spec: if spec_v not in v: meets_criteria = False + elif isinstance(v, list): + if spec not in v: + meets_criteria = False else: if spec != v: meets_criteria = False diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 6dda9cf61de9fa857d009bec6204fad744de1e75..5bcfa16e9e29e9e82b75a3c5f13dff663a89289d 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -97,6 +97,18 @@ def create_reserved_stations_for_testing(station_list): assigned = rarpc.do_assignment(ra_spec) return assigned +def duplicates(l: list) -> list: + # O(n^2), but that's good enough. + uniques = [] + dupes = [] + + for e in l: + if e not in uniques: + uniques.append(e) + elif e not in dupes: + dupes.append(e) + + return dupes class SchedulingTest(unittest.TestCase): def setUp(self): @@ -113,13 +125,12 @@ class SchedulingTest(unittest.TestCase): test_data_creator.wipe_cache() - def test_schedule_observation_subtask_with_enough_resources_available(self): + def _test_schedule_observation_subtask_with_enough_resources_available(self, observation_specification_doc): with tmss_test_env.create_tmss_client() as client: task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="target observation")['url']) task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(task_blueprint_data, '/task_blueprint/') subtask_template = client.get_subtask_template("observation control") - spec = get_default_json_object_for_schema(subtask_template['schema']) - spec['stations']['digital_pointings'][0]['subbands'] = [0] + spec = add_defaults_to_json_object_for_schema(observation_specification_doc, subtask_template['schema']) cluster_url = client.get_path_as_json_object('/cluster/1')['url'] subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], @@ -137,6 +148,34 @@ class SchedulingTest(unittest.TestCase): self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask_id)['status']) + # test whether all dataproduct specifications are unique + outputs = Subtask.objects.get(pk=subtask_id).outputs.all() + dataproduct_specifications_docs = [dp.specifications_doc for output in outputs for dp in output.dataproducts.all()] + duplicate_dataproduct_specification_docs = duplicates(dataproduct_specifications_docs) + + self.assertEqual([], duplicate_dataproduct_specification_docs) + + def test_schedule_observation_subtask_with_enough_resources_available(self): + spec = { "stations": { "digital_pointings": [ { "subbands": [0] } ] } } + self._test_schedule_observation_subtask_with_enough_resources_available(spec) + + def test_schedule_beamformer_observation_subtask_with_enough_resources_available(self): + spec = { + "stations": { "digital_pointings": [ { "name": "target0", "subbands": [0] } ] }, + "COBALT": { + "version": 1, + "correlator": { "enabled": False }, + "beamformer": { + "tab_pipelines": [ + { + "SAPs": [ { "name": "target0", "tabs": [ { "coherent": False }, { "coherent": True } ] } ] + } + ] + } + } + } + self._test_schedule_observation_subtask_with_enough_resources_available(spec) + def test_schedule_observation_subtask_with_one_blocking_reservation_failed(self): """ Set (Resource Assigner) station CS001 to reserved @@ -260,6 +299,7 @@ class SchedulingTest(unittest.TestCase): obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], + specifications_doc={"sap": "target0", "subband": 0 }, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the pipeline... @@ -304,6 +344,7 @@ class SchedulingTest(unittest.TestCase): obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], + specifications_doc={"sap": "target0", "subband": 0}, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the ingest... @@ -416,6 +457,23 @@ class SubtaskInputOutputTest(unittest.TestCase): setting.value = True setting.save() + + def test_specifications_doc_meets_selection_doc(self): + # empty selection matches all + self.assertTrue(specifications_doc_meets_selection_doc({'something else': 'target0'}, {})) + + # specification is a list? specification must be a subset of the selection + self.assertTrue(specifications_doc_meets_selection_doc({'sap': ['target0']}, {'sap': ['target0']})) + self.assertFalse(specifications_doc_meets_selection_doc({'sap': ['target0','target1','target2']}, {'sap': ['target0','target1']})) + + # specification is a value? it must appear in the selection + self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': ['target0']})) + self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': ['target0','target1']})) + self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': 'target0'})) + + # specification must contain the selection key + self.assertFalse(specifications_doc_meets_selection_doc({'something else': 'target0'}, {'sap': 'target0'})) + @mock.patch("lofar.sas.tmss.tmss.tmssapp.subtasks.assign_or_unassign_resources") def test_schedule_pipeline_subtask_filters_predecessor_output_dataproducts_for_input(self, assign_resources_mock): # setup: @@ -431,12 +489,12 @@ class SubtaskInputOutputTest(unittest.TestCase): pipe_in2 = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out2, selection_doc={'sap': ['target1']})) # create obs output dataproducts with specs we can filter on - dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0']})) - dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target1']})) - dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0']})) + dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target0', 'subband': 0})) + dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target1', 'subband': 0})) + dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target0', 'subband': 1})) - dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target0']})) - dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target1']})) + dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': 'target0', 'subband': 0})) + dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': 'target1', 'subband': 0})) # trigger: # schedule pipeline, which should attach the correct subset of dataproducts to the pipeline inputs @@ -488,6 +546,7 @@ class SAPTest(unittest.TestCase): client.set_subtask_status(subtask_id, 'defined') subtask = client.schedule_subtask(subtask_id) + self.assertEqual(1, subtask_model.output_dataproducts.count()) self.assertEqual(1, subtask_model.output_dataproducts.values('sap').count()) self.assertEqual(subtask_model.output_dataproducts.first().sap.specifications_doc['pointing']['angle1'], pointing['angle1']) self.assertEqual(subtask_model.output_dataproducts.first().sap.specifications_doc['pointing']['angle2'], pointing['angle2']) @@ -505,8 +564,8 @@ class SAPTest(unittest.TestCase): pipe_in = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out)) # create obs output dataproducts - dp1_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out)) - dp2_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out)) + dp1_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out, specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }})) + dp2_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out, specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 1 }})) # schedule pipeline, which should copy the SAP schedule_pipeline_subtask(pipe_st) diff --git a/SAS/TMSS/backend/test/t_schemas.py b/SAS/TMSS/backend/test/t_schemas.py index 0cf0157e39e2917d8baaa06384836c4795c41ab4..e9b25c35efca7a967bf7bf541c027cb15b836f7b 100755 --- a/SAS/TMSS/backend/test/t_schemas.py +++ b/SAS/TMSS/backend/test/t_schemas.py @@ -43,13 +43,17 @@ class TestSchemas(unittest.TestCase): """ Check whether the given schema is valid. """ # Can all $refs be actually resolved? - logger.info("Resolving references for schema %s", name) - resolved_refs(schema) + try: + resolved_refs(schema) + except Exception as e: + raise Exception("Failed to resolve references in schema %s" % name) from e # Does this schema provide actually valid defaults? - logger.info("Validating defaults of schema %s", name) - defaults = get_default_json_object_for_schema(schema) - validate_json_against_schema(defaults, schema) + try: + defaults = get_default_json_object_for_schema(schema) + validate_json_against_schema(defaults, schema) + except Exception as e: + raise Exception("Failure in defaults in schema %s" % name) from e def check_schema_table(self, model): """ Check all schemas present in the database for a given model. """ diff --git a/SAS/TMSS/backend/test/t_subtasks.py b/SAS/TMSS/backend/test/t_subtasks.py index 806fcd682579d20829b1b010f5548fb530ae73e1..8086f231da703fba4bcdf574bed9940f0ee6d3d2 100755 --- a/SAS/TMSS/backend/test/t_subtasks.py +++ b/SAS/TMSS/backend/test/t_subtasks.py @@ -181,9 +181,9 @@ class SubTasksCreationFromTaskBluePrint(unittest.TestCase): self.assertEqual(None, subtask) # Next call will fail due to no qa_files object - # ValueError: Cannot create qa_plots subtask for task_blueprint id=1 because it has no qafile subtask(s) - with self.assertRaises(SubtaskCreationException): - subtask = create_qaplots_subtask_from_task_blueprint(task_blueprint) + subtask = create_qaplots_subtask_from_task_blueprint(task_blueprint) + # subtask object is None because QA file conversion is by default not enabled!!!! + self.assertEqual(None, subtask) def test_create_sequence_of_subtask_from_task_blueprint_with_QA_enabled(self): diff --git a/SAS/TMSS/client/lib/populate.py b/SAS/TMSS/client/lib/populate.py index 6d3420403a6490f9b74c7117f4fb845bce66e9e5..986629cd64dc732e4f337fd41dfe74ef8299b09e 100644 --- a/SAS/TMSS/client/lib/populate.py +++ b/SAS/TMSS/client/lib/populate.py @@ -44,8 +44,11 @@ def populate_schemas(schema_dir: str=None, templates_filename: str=None): # load all templates and schemas and prepare them for upload. # determine the dependencies, and upload the depenends first, and the rest in parallel later. for template in templates: - with open(os.path.join(schema_dir, template['file_name'])) as schema_file: - json_schema = json.loads(schema_file.read()) + try: + with open(os.path.join(schema_dir, template['file_name'])) as schema_file: + json_schema = json.loads(schema_file.read()) + except Exception as e: + raise Exception("Could not decode JSON schema %s" % template['file_name']) from e # add template name/description/version from schema if not already in template template['name'] = template.get('name', json_schema.get('title', '<no name>'))