diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index 5714cc07f5648cc64a60ede4350a024e9d34edf8..e52499669aeb153f8243893e81d8b900f8b98161 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -60,6 +60,17 @@ def _stokes_settings_parset_subkeys(stokes_spec: dict) -> dict: return parset +def _dataproduct_parset_subkeys(subtask: models.Subtask, dataproducts: list) -> dict: + """ Return a subset of parset keys and values to list dataproducts. """ + + parset = {} + parset["enabled"] = len(dataproducts) > 0 + parset["filenames"] = [dp.filename for dp in dataproducts] + parset["skip"] = [0] * len(dataproducts) + parset["locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in dataproducts] + + return parset + def _sap_index(saps: dict, sap_name: str) -> int: """ Return the SAP index in the observation given a certain SAP name. """ @@ -80,17 +91,14 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d digi_beams = spec['stations']['digital_pointings'] parset = {} - parset["Observation.DataProducts.Output_Correlated.enabled"] = correlator_enabled - parset["Observation.DataProducts.Output_Correlated.filenames"] = [] - parset["Observation.DataProducts.Output_Correlated.locations"] = [] - parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster - parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = "/data/test-projects" # ResourceEstimator always wants these keys parset["Cobalt.Correlator.nrChannelsPerSubband"] = spec['COBALT']['correlator']['channels_per_subband'] if correlator_enabled else 16 parset["Cobalt.Correlator.nrBlocksPerIntegration"] = spec['COBALT']['correlator']['blocks_per_integration'] if correlator_enabled else 1 parset["Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block'] if correlator_enabled else 1 + correlator_dataproducts = [] + if correlator_enabled: if cobalt_version >= 2 and 'phase_centers' in spec['COBALT']['correlator']: for beam_nr, digi_beam in enumerate(digi_beams): @@ -110,7 +118,6 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype__value=Datatype.Choices.VISIBILITIES.value).order_by('filename')) # marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled. - correlator_dataproducts = [] for digi_beam in digi_beams: for subband in digi_beam["subbands"]: dataproduct = [dp for dp in dataproducts @@ -119,10 +126,12 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d correlator_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) - parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in correlator_dataproducts] - parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in correlator_dataproducts] - # mimic MoM placeholder thingy (the resource estimator parses this) - parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, correlator_dataproducts), "Observation.DataProducts.Output_Correlated.")) + parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster + parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = "/data/test-projects" + + # mimic MoM placeholder thingy (the resource estimator parses this) + parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] return parset @@ -266,15 +275,11 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d # 2) TAB # 3) Stokes # 4) Part - parset["Observation.DataProducts.Output_CoherentStokes.enabled"] = len(coherent_dataproducts) > 0 - parset["Observation.DataProducts.Output_CoherentStokes.filenames"] = [dp.filename for dp in coherent_dataproducts] - parset["Observation.DataProducts.Output_CoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in coherent_dataproducts] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_dataproducts), "Observation.DataProducts.Output_CoherentStokes.")) parset["Observation.DataProducts.Output_CoherentStokes.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster parset["Observation.DataProducts.Output_CoherentStokes.storageClusterPartition"] = "/data/test-projects" - parset["Observation.DataProducts.Output_IncoherentStokes.enabled"] = len(incoherent_dataproducts) > 0 - parset["Observation.DataProducts.Output_IncoherentStokes.filenames"] = [dp.filename for dp in incoherent_dataproducts] - parset["Observation.DataProducts.Output_IncoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in incoherent_dataproducts] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_dataproducts), "Observation.DataProducts.Output_IncoherentStokes.")) parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterPartition"] = "/data/test-projects" @@ -403,35 +408,25 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas return parset +def _common_parset_dict_for_pipeline_schemas(subtask: models.Subtask) -> dict: + """ Return a parset dict with settings common to all pipelines. """ -def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) -> dict: - # see https://support.astron.nl/confluence/pages/viewpage.action?spaceKey=TMSS&title=UC1+JSON + parset = dict() # make sure the spec is complete (including all non-filled in properties with default) spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema) - # ----------------------------------------------------------------------------------------------- - # Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification. - # With the help of Auke and Jan-David I could generate the parset as defined below. - # MAC turned out to be very sensitive for having specific keys with very specific prefixes etc. - # As a result, the generated parset contains many "duplicate"(nested) keys. - # We all agree that this is ugly, and we should not want this, but hey... it works. - # We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project. - # Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API. - # ----------------------------------------------------------------------------------------------- - - parset = dict() # parameterset has no proper assignment operators, so take detour via dict... - # General parset["prefix"] = "LOFAR." parset["Observation.ObsID"] = subtask.pk parset["Observation.momID"] = 0 # Needed by MACScheduler parset["Observation.otdbID"] = subtask.pk # HACK: the pipeline uses otdbID as the sasID. our tmssID>2000000 to prevent clashes. TODO: replace all otdbID's by sasID. parset["Observation.tmssID"] = subtask.pk + parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time + parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time + parset["Observation.processType"] = "Pipeline" - parset["Observation.processSubtype"] = "Averaging Pipeline" - parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "preprocessing_pipeline.py" - parset["Observation.ObservationControl.PythonControl.softwareVersion"] = "" + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) if len(project_set) != 1: raise ConversionException('Subtask pk=%s cannot be converted to parset because it references task blueprint that belong to different projects (names=%s)' % (subtask.pk, project_set)) @@ -443,60 +438,85 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.Cluster.ProcessingCluster.numberOfTasks"] = 110 # deprecated (fixed value) to be completely removed in parset with 'JDM-patch 'soon parset["Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"] = 2 # deprecated (fixed value) to be completely removed in parset with 'JDM-patch 'soon + return parset + + +def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Subtask) -> dict: + # see https://support.astron.nl/confluence/pages/viewpage.action?spaceKey=TMSS&title=UC1+JSON + + # make sure the spec is complete (including all non-filled in properties with default) + spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema) + + # ----------------------------------------------------------------------------------------------- + # Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification. + # With the help of Auke and Jan-David I could generate the parset as defined below. + # MAC turned out to be very sensitive for having specific keys with very specific prefixes etc. + # As a result, the generated parset contains many "duplicate"(nested) keys. + # We all agree that this is ugly, and we should not want this, but hey... it works. + # We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project. + # Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API. + # ----------------------------------------------------------------------------------------------- + + # General + parset = _common_parset_dict_for_pipeline_schemas(subtask) + parset["Observation.processSubtype"] = "Averaging Pipeline" + parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "preprocessing_pipeline.py" + parset["Observation.ObservationControl.PythonControl.softwareVersion"] = "" + # DPPP steps dppp_steps = [] if spec["preflagger0"]["enabled"]: dppp_steps.append('preflagger[0]') - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].chan"] = "[%s]" % spec["preflagger0"]["channels"] - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].abstime"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].azimuth"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].chan"] = spec["preflagger0"]["channels"].split(",") + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].abstime"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].azimuth"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].baseline"] = "" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].blrange"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].blrange"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].corrtype"] = "" parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].count.path"] = "-" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].count.save"] = "false" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].elevation"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].count.save"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].elevation"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].expr"] = "" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].freqrange"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].lst"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].reltime"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeofday"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeslot"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].freqrange"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].lst"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].reltime"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeofday"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeslot"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].type"] = "preflagger" if spec["preflagger1"]["enabled"]: dppp_steps.append('preflagger[1]') parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].corrtype"] = spec["preflagger1"]["corrtype"] - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].abstime"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].azimuth"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].abstime"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].azimuth"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].baseline"] = "" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].blrange"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].chan"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].blrange"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].chan"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].count.path"] = "-" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].count.save"] = "false" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].elevation"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].count.save"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].elevation"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].expr"] = "" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].freqrange"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].lst"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].reltime"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeofday"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeslot"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].freqrange"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].lst"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].reltime"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeofday"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeslot"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].type"] = "preflagger" if spec["aoflagger"]["enabled"]: dppp_steps.append('aoflagger') parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.strategy"] = spec["aoflagger"]["strategy"] - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.autocorr"] = "F" + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.autocorr"] = False parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.count.path"] = "-" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.count.save"] = "FALSE" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.keepstatistics"] = "T" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.memorymax"] = "10" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.memoryperc"] = "0" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.overlapmax"] = "0" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.overlapperc"] = "0" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.pedantic"] = "F" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.pulsar"] = "F" - parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.timewindow"] = "0" + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.count.save"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.keepstatistics"] = True + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.memorymax"] = 10 + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.memoryperc"] = 0 + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.overlapmax"] = 0 + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.overlapperc"] = 0 + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.pedantic"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.pulsar"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.timewindow"] = 0 parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.type"] = "aoflagger" if spec["demixer"]["enabled"]: @@ -509,13 +529,13 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.ObservationControl.PythonControl.DPPP.demixer.ignoretarget"] = spec["demixer"]["ignore_target"] parset["Observation.ObservationControl.PythonControl.PreProcessing.demix_always"] = spec["demixer"]["demix_always"] parset["Observation.ObservationControl.PythonControl.PreProcessing.demix_if_needed"] = spec["demixer"]["demix_if_needed"] - parset["Observation.ObservationControl.PythonControl.DPPP.demixer.blrange"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.demixer.blrange"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.demixer.corrtype"] = "cross" parset["Observation.ObservationControl.PythonControl.DPPP.demixer.elevationcutoff"] = "0.0deg" parset["Observation.ObservationControl.PythonControl.DPPP.demixer.instrumentmodel"] = "instrument" - parset["Observation.ObservationControl.PythonControl.DPPP.demixer.modelsources"] = "[]" - parset["Observation.ObservationControl.PythonControl.DPPP.demixer.ntimechunk"] = "0" - parset["Observation.ObservationControl.PythonControl.DPPP.demixer.othersources"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.demixer.modelsources"] = [] + parset["Observation.ObservationControl.PythonControl.DPPP.demixer.ntimechunk"] = 0 + parset["Observation.ObservationControl.PythonControl.DPPP.demixer.othersources"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.demixer.skymodel"] = "sky" parset["Observation.ObservationControl.PythonControl.DPPP.demixer.subtractsources"] = "" parset["Observation.ObservationControl.PythonControl.DPPP.demixer.targetsource"] = "" @@ -525,26 +545,21 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep"] = 1 parset["Observation.ObservationControl.PythonControl.DPPP.demixer.timestep"] = 1 - parset["Observation.ObservationControl.PythonControl.DPPP.steps"] = "[%s]" % ",".join(dppp_steps) + parset["Observation.ObservationControl.PythonControl.DPPP.steps"] = dppp_steps parset["Observation.ObservationControl.PythonControl.DPPP.msout.storagemanager.name"] = spec["storagemanager"] # Dataproducts - parset["Observation.DataProducts.Input_Correlated.enabled"] = "true" + subtask_inputs = list(subtask.inputs.all()) + in_dataproducts = sum([list(subtask_input.dataproducts.all()) for subtask_input in subtask_inputs],[]) - in_dataproducts = [] - for input_nr, subtask_input in enumerate(subtask.inputs.all()): - in_dataproducts = subtask_input.dataproducts.all() - parset["Observation.DataProducts.Input_Correlated.filenames"] = [dp.filename for dp in in_dataproducts] - parset["Observation.DataProducts.Input_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in in_dataproducts] - # mimic MoM placeholder thingy (the resource assigner parses this) - # should be expanded with SAPS and datatypes - parset["Observation.DataProducts.Input_Correlated.identifications"] = "[TMSS_subtask_%s.SAP%03d]" % (subtask_input.producer.subtask.id, input_nr) + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, in_dataproducts), "Observation.DataProducts.Input_Correlated.")) - parset["Observation.DataProducts.Input_Correlated.skip"] = "[%s]" % ",".join(['0']*len(in_dataproducts)) + # mimic MoM placeholder thingy (the resource assigner parses this) + # should be expanded with SAPS and datatypes + parset["Observation.DataProducts.Input_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask_input.producer.subtask.id, input_nr) for input_nr, subtask_input in enumerate(subtask_inputs)] - # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work - subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) - unsorted_out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs],[]) + subtask_outputs = list(subtask.outputs.all()) + unsorted_out_dataproducts = sum([list(subtask_output.dataproducts.all()) for subtask_output in subtask_outputs],[]) def find_dataproduct(dataproducts: list, specification_doc: dict): hits = [dp for dp in dataproducts if dp.specifications_doc['sap'] == specification_doc['sap'] @@ -554,39 +569,36 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) # list output dataproducts in the same order as input dataproducts, matched by the identifiers out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc) for in_dp in in_dataproducts] - parset["Observation.DataProducts.Output_Correlated.enabled"] = "true" - parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in out_dataproducts] - parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in out_dataproducts] - parset["Observation.DataProducts.Output_Correlated.skip"] = "[%s]" % ",".join(['0']*len(out_dataproducts)) - parset["Observation.DataProducts.Output_Correlated.identifications"] = "[TMSS_subtask_%s.SAP%03d]" % (subtask.id, 0) + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Correlated.")) + parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)] parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # Other parset["Observation.ObservationControl.PythonControl.PreProcessing.SkyModel"] = "Ateam_LBA_CC" - parset["Observation.ObservationControl.PythonControl.DPPP.checkparset"] = "-1" + parset["Observation.ObservationControl.PythonControl.DPPP.checkparset"] = -1 - parset["Observation.ObservationControl.PythonControl.DPPP.msin.autoweight"] = "true" - parset["Observation.ObservationControl.PythonControl.DPPP.msin.band"] = "-1" + parset["Observation.ObservationControl.PythonControl.DPPP.msin.autoweight"] = True + parset["Observation.ObservationControl.PythonControl.DPPP.msin.band"] = -1 parset["Observation.ObservationControl.PythonControl.DPPP.msin.baseline"] = "" - parset["Observation.ObservationControl.PythonControl.DPPP.msin.blrange"] = "[]" + parset["Observation.ObservationControl.PythonControl.DPPP.msin.blrange"] = [] parset["Observation.ObservationControl.PythonControl.DPPP.msin.corrtype"] = "" parset["Observation.ObservationControl.PythonControl.DPPP.msin.datacolumn"] = "DATA" - parset["Observation.ObservationControl.PythonControl.DPPP.msin.forceautoweight"] = "false" - parset["Observation.ObservationControl.PythonControl.DPPP.msin.missingdata"] = "false" + parset["Observation.ObservationControl.PythonControl.DPPP.msin.forceautoweight"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.msin.missingdata"] = False parset["Observation.ObservationControl.PythonControl.DPPP.msin.nchan"] = "nchan" - parset["Observation.ObservationControl.PythonControl.DPPP.msin.orderms"] = "false" - parset["Observation.ObservationControl.PythonControl.DPPP.msin.sort"] = "false" - parset["Observation.ObservationControl.PythonControl.DPPP.msin.startchan"] = "0" - parset["Observation.ObservationControl.PythonControl.DPPP.msin.useflag"] = "true" - parset["Observation.ObservationControl.PythonControl.DPPP.msout.overwrite"] = "false" - parset["Observation.ObservationControl.PythonControl.DPPP.msout.tilenchan"] = "8" - parset["Observation.ObservationControl.PythonControl.DPPP.msout.tilesize"] = "4096" + parset["Observation.ObservationControl.PythonControl.DPPP.msin.orderms"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.msin.sort"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.msin.startchan"] = 0 + parset["Observation.ObservationControl.PythonControl.DPPP.msin.useflag"] = True + parset["Observation.ObservationControl.PythonControl.DPPP.msout.overwrite"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.msout.tilenchan"] = 8 + parset["Observation.ObservationControl.PythonControl.DPPP.msout.tilesize"] = 4096 parset["Observation.ObservationControl.PythonControl.DPPP.msout.vdsdir"] = "A" - parset["Observation.ObservationControl.PythonControl.DPPP.msout.writefullresflag"] = "true" + parset["Observation.ObservationControl.PythonControl.DPPP.msout.writefullresflag"] = True - parset["Observation.ObservationControl.PythonControl.DPPP.showprogress"] = "F" - parset["Observation.ObservationControl.PythonControl.DPPP.showtimings"] = "F" - parset["Observation.ObservationControl.PythonControl.DPPP.uselogger"] = "T" + parset["Observation.ObservationControl.PythonControl.DPPP.showprogress"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.showtimings"] = False + parset["Observation.ObservationControl.PythonControl.DPPP.uselogger"] = True # pragmatic solution to deal with the various parset using subsystems... # some want the keys as "Observation.<subkey>" and some as "ObsSW.Observation.<subkey>" @@ -597,10 +609,75 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) return parset +def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) -> dict: + # make sure the spec is complete (including all non-filled in properties with default) + spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema) + + # General + parset = _common_parset_dict_for_pipeline_schemas(subtask) + parset["Observation.processSubtype"] = "Pulsar Pipeline" + parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "pulsar_pipeline.py" + parset["Observation.ObservationControl.PythonControl.softwareVersion"] = "lofar-pulp" + + # Pulsar pipeline settings + parset["Observation.ObservationControl.PythonControl.Pulsar.2bf2fits_extra_opts"] = spec["presto"]["2bf2fits_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.8bit_conversion_sigma"] = spec["output"]["8bit_conversion_sigma"] + parset["Observation.ObservationControl.PythonControl.Pulsar.decode_nblocks"] = spec["presto"]["decode_nblocks"] + parset["Observation.ObservationControl.PythonControl.Pulsar.decode_sigma"] = spec["presto"]["decode_sigma"] + parset["Observation.ObservationControl.PythonControl.Pulsar.digifil_extra_opts"] = spec["dspsr"]["digifil_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.dspsr_extra_opts"] = spec["dspsr"]["dspsr_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.dynamic_spectrum_time_average"] = spec["output"]["dynamic_spectrum_time_average"] + parset["Observation.ObservationControl.PythonControl.Pulsar.nofold"] = spec["presto"]["nofold"] + parset["Observation.ObservationControl.PythonControl.Pulsar.nopdmp"] = spec["dspsr"]["nopdmp"] + parset["Observation.ObservationControl.PythonControl.Pulsar.norfi"] = spec["dspsr"]["norfi"] + parset["Observation.ObservationControl.PythonControl.Pulsar.prepdata_extra_opts"] = spec["presto"]["prepdata_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.prepfold_extra_opts"] = spec["presto"]["prepfold_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.prepsubband_extra_opts"] = spec["presto"]["prepsubband_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.pulsar"] = spec["pulsar"] + parset["Observation.ObservationControl.PythonControl.Pulsar.raw_to_8bit"] = spec["output"]["raw_to_8bit"] + parset["Observation.ObservationControl.PythonControl.Pulsar.rfifind_extra_opts"] = spec["presto"]["rfifind_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.rrats"] = spec["presto"]["rrats"] + parset["Observation.ObservationControl.PythonControl.Pulsar.rrats_dm_range"] = spec["presto"]["rrats_dm_range"] + parset["Observation.ObservationControl.PythonControl.Pulsar.single_pulse"] = spec["single_pulse"] + parset["Observation.ObservationControl.PythonControl.Pulsar.skip_dspsr"] = spec["dspsr"]["skip_dspsr"] + parset["Observation.ObservationControl.PythonControl.Pulsar.skip_dynamic_spectrum"] = spec["output"]["skip_dynamic_spectrum"] + parset["Observation.ObservationControl.PythonControl.Pulsar.skip_prepfold"] = spec["presto"]["skip_prepfold"] + parset["Observation.ObservationControl.PythonControl.Pulsar.tsubint"] = spec["dspsr"]["tsubint"] + + # Dataproducts. NOTE: The pulsar pipeline doesn't actually use this information, and reads input/writes output as it pleases. + + inputs = subtask.inputs.all() + in_dataproducts = sum([list(subtask_input.dataproducts.all()) for subtask_input in inputs], []) + coherent_in_dataproducts = [dp for dp in in_dataproducts if dp.specifications_doc["coherent"]] + incoherent_in_dataproducts = [dp for dp in in_dataproducts if not dp.specifications_doc["coherent"]] + + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_in_dataproducts), "Observation.DataProducts.Input_CoherentStokes.")) + parset["Observation.DataProducts.Input_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator + + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_in_dataproducts), "Observation.DataProducts.Input_IncoherentStokes.")) + parset["Observation.DataProducts.Input_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator + + # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work + subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) + out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs], []) # todo, order these correctly? + + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Pulsar.")) + parset["Observation.DataProducts.Output_Pulsar.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)] + parset["Observation.DataProducts.Output_Pulsar.storageClusterName"] = subtask.cluster.name + + # pragmatic solution to deal with the various parset using subsystems... + # some want the keys as "Observation.<subkey>" and some as "ObsSW.Observation.<subkey>" + # so, just copy all "Observation.<subkey>" keys and prepend them with ObsSW. + for key, value in list(parset.items()): + if key.startswith("Observation."): + parset["ObsSW."+key] = value + + return parset # dict to store conversion methods based on subtask.specifications_template.name _convertors = {'observation control': _convert_to_parset_dict_for_observationcontrol_schema, - 'pipeline control': _convert_to_parset_dict_for_pipelinecontrol_schema } + 'preprocessing pipeline': _convert_to_parset_dict_for_preprocessing_pipeline_schema, + 'pulsar pipeline': _convert_to_parset_dict_for_pulsarpipeline_schema} def convert_to_parset(subtask: models.Subtask) -> parameterset: diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py index f647a9a9caada1b2c7b4e8a044ce5e15a6d22619..4d162144843eeb0367673bd31cce1eaca620ab41 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py @@ -185,7 +185,7 @@ def create_sip_representation_for_subtask(subtask: Subtask): sourcedata_identifiers=sourcedata_identifiers, process_map=process_map) - if subtask.specifications_template.name == "pipeline control": # todo: re-evaluate this because schema name might change + if subtask.specifications_template.name == "preprocessing pipeline": # todo: re-evaluate this because schema name might change spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema) pipeline = siplib.AveragingPipeline( # <-- this is what we need for UC1 pipeline_map, diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index b7e4c3b56074d6d6b21d6bec481ef603f562895b..31f5ca0a85ac7a679c725d03922678ab468e0588 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -75,6 +75,7 @@ class Datatype(AbstractChoice): INSTRUMENT_MODEL = "instrument model" IMAGE = "image" QUALITY = "quality" + PULSAR_PROFILE = "pulsar profile" class Dataformat(AbstractChoice): @@ -86,6 +87,8 @@ class Dataformat(AbstractChoice): BEAMFORMED = "Beamformed" QA_HDF5 = "QA_HDF5" QA_PLOTS = "QA_Plots" + PULP_SUMMARY = "pulp summary" + PULP_ANALYSIS = "pulp analysis" class CopyReason(AbstractChoice): diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py index e07b02aaa9d6f2c20295a13aa9c3661da3f0bb64..ecb2c6f7ad1705651ee97084dc9c0e827aeede35 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py @@ -420,6 +420,32 @@ def populate_connectors(): task_template=TaskTemplate.objects.get(name='target observation'), iotype=IOType.objects.get(value=IOType.Choices.OUTPUT.value)) + # beamforming observation + TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.BEAMFORMER.value), + datatype=Datatype.objects.get(value=Datatype.Choices.TIME_SERIES.value), + dataformat=Dataformat.objects.get(value=Dataformat.Choices.BEAMFORMED.value), + task_template=TaskTemplate.objects.get(name='beamforming observation'), + iotype=IOType.objects.get(value=IOType.Choices.OUTPUT.value)) + + # pulsar pipeline + TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.BEAMFORMER.value), + datatype=Datatype.objects.get(value=Datatype.Choices.TIME_SERIES.value), + dataformat=Dataformat.objects.get(value=Dataformat.Choices.BEAMFORMED.value), + task_template=TaskTemplate.objects.get(name='pulsar pipeline'), + iotype=IOType.objects.get(value=IOType.Choices.INPUT.value)) + + TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.ANY.value), + datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), + dataformat=Dataformat.objects.get(value=Dataformat.Choices.PULP_SUMMARY.value), + task_template=TaskTemplate.objects.get(name='pulsar pipeline'), + iotype=IOType.objects.get(value=IOType.Choices.OUTPUT.value)) + + TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.ANY.value), + datatype=Datatype.objects.get(value=Datatype.Choices.PULSAR_PROFILE.value), + dataformat=Dataformat.objects.get(value=Dataformat.Choices.PULP_ANALYSIS.value), + task_template=TaskTemplate.objects.get(name='pulsar pipeline'), + iotype=IOType.objects.get(value=IOType.Choices.OUTPUT.value)) + # preprocessing pipeline for iotype_value in (IOType.Choices.INPUT.value, IOType.Choices.OUTPUT.value): TaskConnectorType.objects.create(role=Role.objects.get(value=Role.Choices.ANY.value), diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/common_schema_template-tasks-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/common_schema_template-tasks-1.json index 398542538b828ae57a2d392dffcb79e8259ac87e..ae7d909686d137cd581b0701bc6af5c754a3254f 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/common_schema_template-tasks-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/common_schema_template-tasks-1.json @@ -22,13 +22,13 @@ "type": "string", "title": "Data Type", "description": "The data type of a task connector describes its what kind of data is produced/consumed.", - "enum": ["visibilities", "time series", "instrument model", "image", "quality"] + "enum": ["visibilities", "time series", "instrument model", "image", "quality", "pulsar profile"] }, "dataformat": { "type": "string", "title": "Data Format", "description": "The data type of a task connector describes in which format the data is produced/consumed.", - "enum": ["MeasurementSet", "Beamformed", "QA_HDF5", "QA_Plots"] + "enum": ["MeasurementSet", "Beamformed", "QA_HDF5", "QA_Plots", "pulp summary", "pulp analysis"] } }, "required": [ @@ -38,4 +38,4 @@ ] } } -} \ No newline at end of file +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_feedback_template-pulp.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_feedback_template-pulp.json new file mode 100644 index 0000000000000000000000000000000000000000..f731916f10ee6eb6a8336dd3d5b4dd67b90f7ceb --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_feedback_template-pulp.json @@ -0,0 +1,175 @@ +{ + "$id":"http://tmss.lofar.org/api/schemas/dataproductfeedbacktemplate/feedback/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "feedback", + "type": "object", + "default": {}, + "properties": { + "percentage_written": { + "title": "Percentage written", + "type": "integer", + "default": 0 + }, + "frequency": { + "title": "Frequency", + "type": "object", + "default": {}, + "properties": { + "subbands": { + "title": "Subbands", + "type": "array", + "default": [], + "items": { + "title": "Subband", + "type": "integer", + "minimum": 0, + "maximum": 511 + } + }, + "central_frequencies": { + "title": "Central frequencies", + "type": "array", + "default": [], + "items": { + "title": "frequency", + "type": "number", + "default": 0.0, + "minimum": 0.0 + } + }, + "channel_width": { + "title": "Channel width", + "type": "number", + "default": 3051.8, + "minimum": 0.0 + }, + "channels_per_subband": { + "title": "Channels per subband", + "type": "integer", + "default": 64, + "minimum": 1 + } + }, + "required": [ "subbands", "central_frequencies", "channel_width", "channels_per_subband" ] + }, + "time": { + "title": "Time", + "type": "object", + "default": {}, + "properties": { + "start_time": { + "title": "Start time", + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/datetime/1/#/definitions/timestamp", + "default": "1970-01-01T00:00:00Z" + }, + "duration": { + "title": "Duration", + "type": "number", + "default": 0.0 + }, + "sample_width": { + "title": "Sample width", + "type": "number", + "default": 0.0 + } + }, + "required": [ "start_time", "duration", "sample_width" ] + }, + "antennas": { + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/stations/1/#/definitions/antennas", + "default": {} + }, + "target": { + "title": "Target", + "type": "object", + "default": {}, + "properties": { + "pointing": { + "title": "Pointing", + "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/pointing/1/#/definitions/pointing", + "default": {} + } + }, + "required": [ "pointing" ] + }, + "samples": { + "title": "Samples", + "type": "object", + "default": {}, + "properties": { + "polarisations": { + "title": "Polarisations", + "type": "array", + "default": [ + "XX", + "XY", + "YX", + "YY" + ], + "items": { + "title": "Polarisation", + "type": "string", + "default": "I", + "enum": [ + "XX", + "XY", + "YX", + "YY", + "I", + "Q", + "U", + "V", + "Xr", + "Xi", + "Yr", + "Yi" + ] + } + }, + "type": { + "title": "Type", + "type": "string", + "default": "float", + "enum": [ + "float", + "integer" + ] + }, + "complex": { + "title": "Complex values", + "type": "boolean", + "default": true + }, + "bits": { + "title": "Bits per sample", + "type": "integer", + "default": 32, + "enum": [ + 4, + 8, + 16, + 32, + 64 + ] + }, + "writer": { + "title": "Writer", + "type": "string", + "default": "standard", + "enum": [ + "lofarstman", + "standard", + "dysco" + ] + }, + "writer_version": { + "title": "Writer version", + "type": "string", + "default": "UNKNOWN" + } + }, + "required": [ "polarisations", "type", "complex", "bits", "writer" ] + } + }, + "required": [ "percentage_written", "frequency", "time", "antennas", "target", "samples" ] +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-pulp-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-pulp-1.json new file mode 100644 index 0000000000000000000000000000000000000000..47ba6271b11466d5687e23fbc641ab160b7ad86a --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-pulp-1.json @@ -0,0 +1,34 @@ +{ + "$id":"http://tmss.lofar.org/api/schemas/dataproductspecificationtemplate/pulp summary/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "pulp summary", + "type": "object", + "default": {}, + "properties": { + "coherent": { + "title": "Coherent", + "description": "Summary covers coherent or incoherent TABs", + "type": "boolean", + "default": true + }, + "identifiers": { + "title": "Identifiers", + "description": "Identification of this dataproduct within the producing subtask.", + "type": "object", + "default": {}, + "properties": { + "obsid": { + "title": "Observation ID", + "description": "Summary covers TABs of this subtask observation ID", + "type": "integer", + "default": 0, + "minimum": 0 + } + }, + "required": [ + "obsid" + ] + } + }, + "required": [ "identifiers" ] +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json index d11ec11cc085263e455984410ad0f4e3dcc8e5ca..04b609dbe320ff4cb9af1cdef19fcb17d7fc1b49 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json @@ -1,7 +1,7 @@ { - "$id":"http://tmss.lofar.org/api/schemas/dataproductspecificationtemplate/timeseries/1#", + "$id":"http://tmss.lofar.org/api/schemas/dataproductspecificationtemplate/time series/1#", "$schema": "http://json-schema.org/draft-06/schema#", - "title": "timeseries", + "title": "time series", "type": "object", "default": {}, "properties": { @@ -10,6 +10,12 @@ "title": "SAP", "default": "" }, + "coherent": { + "title": "Coherent", + "description": "TAB is a coherent addition", + "type": "boolean", + "default": true + }, "identifiers": { "title": "Identifiers", "description": "Identification of this dataproduct within the producing subtask.", @@ -50,20 +56,13 @@ "default": 0, "minimum": 0, "maximum": 3 - }, - "coherent": { - "title": "Coherent", - "description": "TAB is a coherent addition", - "type": "boolean", - "default": true } }, "required": [ "sap_index", "tab_index", "part_index", - "stokes_index", - "coherent" + "stokes_index" ] } }, diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pipeline-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-preprocessing-pipeline-1.json similarity index 96% rename from SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pipeline-1.json rename to SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-preprocessing-pipeline-1.json index e52ab545b6fb1fc8224b83a9144f880dbd0fed1f..1fb96f5442e695448fd2f8e6a91d9d20516bdecb 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pipeline-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-preprocessing-pipeline-1.json @@ -1,8 +1,8 @@ { - "$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/pipeline control/1#", + "$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/preprocessing pipeline/1#", "$schema": "http://json-schema.org/draft-06/schema#", - "title":"pipeline control", - "description":"This schema defines the parameters to setup and control a (preprocessing) pipeline subtask.", + "title":"preprocessing pipeline", + "description":"This schema defines the parameters to setup and control a preprocessing pipeline subtask.", "version":1, "type": "object", "properties": { diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pulsar-pipeline-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pulsar-pipeline-1.json new file mode 100644 index 0000000000000000000000000000000000000000..cdf9f7717ef46f9acc4d51aa25f6b66ad1b5541e --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pulsar-pipeline-1.json @@ -0,0 +1,179 @@ +{ + "$id": "http://tmss.lofar.org/api/schemas/subtasktemplate/pulsar pipeline/1#", + "type": "object", + "title": "pulsar pipeline", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "pulsar": { + "type": "string", + "title": "Pulsar name/strategy", + "description": "Name of the pulsar to fold, or strategy how to find it", + "default": "tabfind+" + }, + "single_pulse": { + "type": "boolean", + "title": "Single-pulse search", + "default": false + }, + "threads": { + "type": "integer", + "title": "Number of CPU threads to use", + "default": 2, + "minimum": 1 + }, + "presto": { + "title": "PRESTO", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "2bf2fits_extra_opts": { + "type": "string", + "title": "2bf2fits options", + "description": "HDF5 to PSRFITS command-line options", + "default": "" + }, + "decode_nblocks": { + "title": "Decode nr blocks", + "description": "Number of blocks to read & decode at once", + "type": "integer", + "minimum": 1, + "default": 100 + }, + "decode_sigma": { + "title": "Decode sigma", + "description": "Sigma threshold for decoding", + "type": "number", + "minimum": 1, + "default": 3 + }, + "nofold": { + "title": "Skip folding", + "description": "If true, do not fold the pulsar", + "type": "boolean", + "default": false + }, + "prepdata_extra_opts": { + "type": "string", + "title": "prepdata options", + "description": "PREPDATA command-line options", + "default": "" + }, + "prepfold_extra_opts": { + "type": "string", + "title": "prepdata options", + "description": "PREPDATA command-line options", + "default": "" + }, + "prepsubband_extra_opts": { + "type": "string", + "title": "prepsubband options", + "description": "PREPSUBBAND command-line options", + "default": "" + }, + "rfifind_extra_opts": { + "type": "string", + "title": "RFI find options", + "description": "RFIFIND command-line options", + "default": "" + }, + "rrats": { + "title": "RRATs analysis", + "type": "boolean", + "default": false + }, + "rrats_dm_range": { + "title": "RRATs DM range", + "type": "number", + "minimum": 0.0, + "default": 5.0 + }, + "skip_prepfold": { + "title": "Skip PREPFOLD", + "type": "boolean", + "default": false + } + } + }, + "dspsr": { + "title": "DSPSR", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "skip_dspsr": { + "type": "boolean", + "title": "Skip DSPSR", + "description": "If true, do not run DSPSR", + "default": false + }, + "digifil_extra_opts": { + "type": "string", + "title": "DIGIFIL options", + "description": "DIGIFIL command-line options", + "default": "" + }, + "dspsr_extra_opts": { + "type": "string", + "title": "DSPSR options", + "description": "DSPSR command-line options", + "default": "" + }, + "nopdmp": { + "title": "Skip optimising period & DM", + "type": "boolean", + "default": false + }, + "norfi": { + "title": "Skip RFI cleaning", + "type": "boolean", + "default": false + }, + "tsubint": { + "title": "Subintegration length", + "type": "integer", + "minimum": -1, + "default": -1 + } + } + }, + "output": { + "title": "Output", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "raw_to_8bit": { + "type": "boolean", + "title": "Convert to 8 bit", + "description": "Convert output from 32-bit to 8-bit samples", + "default": false + }, + "8bit_conversion_sigma": { + "type": "number", + "title": "Conversion sigma", + "description": "Conversion sigma to use when converting to 8-bit samples", + "minimum": 1.0, + "default": 5.0 + }, + "skip_dynamic_spectrum": { + "title": "Skip dynamic spectrum", + "type": "boolean", + "default": false + }, + "dynamic_spectrum_time_average": { + "title": "Dynamic spectrum time average", + "type": "number", + "minimum": 0.01, + "default": 0.5 + } + } + } + }, + "required": [ + "pulsar", + "presto", + "dspsr", + "output" + ] +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-pulsar_pipeline-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-pulsar_pipeline-1.json new file mode 100644 index 0000000000000000000000000000000000000000..ff7248ca01a0bc7f560bc6ea7d2fceff269a9dd7 --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-pulsar_pipeline-1.json @@ -0,0 +1,224 @@ +{ + "$id": "http://tmss.lofar.org/api/schemas/tasktemplate/pulsar pipeline/1#", + "type": "object", + "title": "pulsar pipeline", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "pulsar": { + "title": "Pulsar to fold", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "strategy": { + "type": "string", + "title": "Strategy", + "description": "How to look up the pulsar to fold", + "default": "manual", + "enum": [ + "manual", + "meta", + "sapfind", + "sapfind3", + "tabfind", + "tabfind+" + ] + }, + "name": { + "type": "string", + "title": "Name", + "description": "Name of the pulsar to fold, if strategy=manual", + "default": "" + } + } + }, + "single_pulse_search": { + "type": "boolean", + "title": "Single-pulse search", + "default": false + }, + "presto": { + "title": "PRESTO", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "input": { + "title": "Input", + "type": "object", + "additionalProperties": false, + "properties": { + "nr_blocks": { + "title": "Nr of blocks", + "description": "Number of blocks to read at a time", + "type": "integer", + "minimum": 1, + "default": 100 + }, + "samples_per_block": { + "title": "Block size (samples)", + "type": "integer", + "minimum": 512, + "default": 8192 + }, + "decode_sigma": { + "title": "Decode sigma", + "description": "Sigma threshold for decoding", + "type": "number", + "minimum": 1, + "default": 3 + } + } + }, + "fold_profile": { + "title": "Fold", + "description": "Fold the pulsar profile", + "type": "boolean", + "default": true + }, + "prepfold": { + "title": "Enable prepfold", + "type": "boolean", + "default": true + }, + "rrats": { + "title": "RRATs analysis", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "enabled": { + "title": "Enabled", + "type": "boolean", + "default": false + }, + "dm_range": { + "title": "DM range", + "type": "number", + "minimum": 0, + "default": 5 + } + } + } + } + }, + "dspsr": { + "title": "DSPSR", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean", + "title": "Enabled", + "default": true + }, + "digifil": { + "title": "DSPSR", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "dm": { + "title": "DM", + "desciption": "Dispersion Measure (0.0 for none)", + "type": "number", + "minimum": 0, + "default": 0 + }, + "integration_time": { + "title": "Integration time", + "type": "number", + "minimum": 0.1, + "default": 4 + }, + "frequency_channels": { + "title": "Frequency channels", + "description": "Number of frequency channels (multiple of subbands/part)", + "type": "integer", + "minimum": 1, + "maximum": 512, + "default": 512 + }, + "coherent_dedispersion": { + "title": "Coherent Dedispersion", + "type": "boolean", + "default": true + } + } + }, + "optimise_period_dm": { + "title": "Optimise period & DM", + "type": "boolean", + "default": true + }, + "rfi_excision": { + "title": "RFI excision", + "description": "Excise/clean/remove detected RFI", + "type": "boolean", + "default": true + }, + "subintegration_length": { + "title": "Subintegration length", + "type": "integer", + "minimum": -1, + "default": -1 + } + } + }, + "output": { + "title": "Output", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "quantisation": { + "title": "Quantisation", + "description": "Quantise output into 8-bit samples", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean", + "title": "Enabled", + "default": false + }, + "scale": { + "type": "number", + "title": "Conversion sigma", + "description": "Conversion sigma to use when converting to 8-bit samples", + "minimum": 1, + "default": 5 + } + } + }, + "dynamic_spectrum": { + "title": "Dynamic Spectrum", + "type": "object", + "default": {}, + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean", + "title": "Enabled", + "default": false + }, + "time_average": { + "type": "number", + "title": "Time average", + "minimum": 0.01, + "default": 0.5 + } + } + } + } + } + }, + "required": [ + "pulsar", + "presto", + "dspsr", + "output" + ] +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json index 644405c2243aa00b45ea54d58bb696c767ebc1ac..b5c8b025b3f50eacae1a6fea3a50fbfad97328ab 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/templates.json @@ -35,6 +35,10 @@ "file_name": "dataproduct_specifications_template-empty-1.json", "template": "dataproduct_specifications_template" }, + { + "file_name": "dataproduct_specifications_template-pulp-1.json", + "template": "dataproduct_specifications_template" + }, { "file_name": "dataproduct_specifications_template-timeseries-1.json", "template": "dataproduct_specifications_template" @@ -90,6 +94,15 @@ "validation_code_js": "", "description": "This schema defines the parameters for a preprocessing pipeline." }, + { + "file_name": "task_template-pulsar_pipeline-1.json", + "template": "task_template", + "name": "pulsar pipeline", + "type": "pipeline", + "version": 1, + "validation_code_js": "", + "description": "This schema defines the parameters for a pulsar pipeline." + }, { "file_name": "subtask_template-observation-1.json", "template": "subtask_template", @@ -98,25 +111,32 @@ "queue": false }, { - "file_name": "subtask_template-pipeline-1.json", + "file_name": "subtask_template-preprocessing-pipeline-1.json", "template": "subtask_template", "type": "pipeline", - "realtime": true, - "queue": false + "realtime": false, + "queue": true + }, + { + "file_name": "subtask_template-pulsar-pipeline-1.json", + "template": "subtask_template", + "type": "pipeline", + "realtime": false, + "queue": true }, { "file_name": "subtask_template-qa_file-1.json", "template": "subtask_template", "type": "qa_files", - "realtime": true, - "queue": false + "realtime": false, + "queue": true }, { "file_name": "subtask_template-qa_plots-1.json", "template": "subtask_template", "type": "qa_plots", - "realtime": true, - "queue": false + "realtime": false, + "queue": true }, { "file_name": "scheduling_constraints_template-constraints-1.json", diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 3fe7622b1df278cdf097ad3b09552d7282674d30..0e1b04ec5bc00efbdaedbb1fd8b1793bfe052de0 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -8,6 +8,7 @@ from functools import cmp_to_key from collections.abc import Iterable from math import ceil from lofar.common.ring_coordinates import RingCoordinates +from os.path import splitext from lofar.common.datetimeutils import formatDatetime, round_to_second_precision from lofar.common import isProductionEnvironment @@ -62,6 +63,7 @@ def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subta create_qafile_subtask_from_task_blueprint, create_qaplots_subtask_from_task_blueprint], 'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint], + 'pulsar pipeline': [create_pulsar_pipeline_subtask_from_task_blueprint], 'ingest': [create_ingest_subtask_from_task_blueprint], 'cleanup': [create_cleanup_subtask_from_task_blueprint]} generators_mapping['calibrator observation'] = generators_mapping['target observation'] @@ -647,7 +649,7 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta return qaplots_subtask -def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: +def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, subtask_template_name: str, generate_subtask_specs_from_task_spec_func) -> Subtask: ''' Create a subtask to for the preprocessing pipeline. This method implements "Instantiate subtasks" step from the "Specification Flow" https://support.astron.nl/confluence/display/TMSS/Specification+Flow @@ -655,6 +657,7 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri # step 0: check pre-requisites check_prerequities_for_subtask_creation(task_blueprint) # TODO: go more elegant lookup of predecessor observation task + # TODO: do not require the input to come from an observation observation_predecessor_tasks = [t for t in task_blueprint.predecessors.all() if any(st for st in t.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)] if not observation_predecessor_tasks: @@ -662,10 +665,11 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri "to an observation predecessor (sub)task." % task_blueprint.pk) # step 1: create subtask in defining state, with filled-in subtask_template - subtask_template = SubtaskTemplate.objects.get(name='pipeline control') + subtask_template = SubtaskTemplate.objects.get(name=subtask_template_name) default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema) task_specs_with_defaults = add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc, task_blueprint.specifications_template.schema) - subtask_specs = _generate_subtask_specs_from_preprocessing_task_specs(task_specs_with_defaults, default_subtask_specs) + subtask_specs = generate_subtask_specs_from_task_spec_func(task_specs_with_defaults, default_subtask_specs) + cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") subtask_data = { "start_time": None, "stop_time": None, @@ -700,6 +704,14 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri return subtask +def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: + return create_pipeline_subtask_from_task_blueprint(task_blueprint, "preprocessing pipeline", _generate_subtask_specs_from_preprocessing_task_specs) + + +def create_pulsar_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: + return create_pipeline_subtask_from_task_blueprint(task_blueprint, "pulsar pipeline", _generate_subtask_specs_from_pulsar_pipeline_task_specs) + + def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: ''' Create a subtask to for an ingest job This method implements "Instantiate subtasks" step from the "Specification Flow" @@ -1237,6 +1249,38 @@ def get_previous_related_task_blueprint_with_time_offset(task_blueprint): return previous_related_task_blueprint, time_offset +def _bulk_create_dataproducts_with_global_identifiers(dataproducts: list) -> list: + """ + Bulk create the provided dataproducts in the database, and give each of them an unique global identifier. + + :return: the created dataproduct objects + """ + + # Bulk create identifiers, and then update the dataproducts with a link to the actual created objects. + # This is needed as bulk_create needs to have any relations resolved. + dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in dataproducts]) + for dp, global_identifier in zip(dataproducts, dp_global_identifiers): + dp.global_identifier = global_identifier + + return Dataproduct.objects.bulk_create(dataproducts) + + +def _output_root_directory(subtask: Subtask) -> str: + """ Return the directory under which output needs to be stored. """ + + # Support for several projects will be added in TMSS-689, for now catch it. + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) + if len(project_set) != 1: + raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it references task blueprints that belong to different projects=%s' % (subtask.id, project_set)) + + project = list(project_set)[0] + + directory = "/data/%s/%s/L%s" % ("projects" if isProductionEnvironment() else "test-projects", + project, + subtask.id) + + return directory + def schedule_observation_subtask(observation_subtask: Subtask): ''' Schedule the given observation_subtask For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler. @@ -1318,13 +1362,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): specifications_template=SAPTemplate.objects.get(name="SAP")) for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings'])] # store everything below this directory - # Support for several projects will be added in TMSS-689, for now catch it. - project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in observation_subtask.task_blueprints.all()]) - if len(project_set) != 1: - raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it references task blueprints that belong to different projects=%s' % (observation_subtask.id, project_set)) - directory = "/data/%s/%s/L%s" % ("projects" if isProductionEnvironment() else "test-projects", - list(project_set)[0], # TMSS-689: use correct project name for each dataproduct - observation_subtask.id) + directory = _output_root_directory(observation_subtask) # create correlated dataproducts if specifications_doc['COBALT']['correlator']['enabled']: @@ -1357,7 +1395,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): # create beamformer dataproducts - dataproduct_specifications_template_timeseries = DataproductSpecificationsTemplate.objects.get(name="timeseries") + dataproduct_specifications_template_timeseries = DataproductSpecificationsTemplate.objects.get(name="time series") def _sap_index(saps: dict, sap_name: str) -> int: """ Return the SAP index in the observation given a certain SAP name. """ @@ -1408,12 +1446,8 @@ def schedule_observation_subtask(observation_subtask: Subtask): for tab_idx, tab in enumerate(fields): dataproducts += tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'], True) - # Bulk create identifiers, and then update the dataproducts with a link to the actual created objects. - # This is needed as bulk_create needs to have any relations resolved. - dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in dataproducts]) - for dp, global_identifier in zip(dataproducts, dp_global_identifiers): - dp.global_identifier = global_identifier - Dataproduct.objects.bulk_create(dataproducts) + # create the dataproducts + _bulk_create_dataproducts_with_global_identifiers(dataproducts) # step 4: resource assigner (if possible) assign_or_unassign_resources(observation_subtask) @@ -1427,6 +1461,121 @@ def schedule_observation_subtask(observation_subtask: Subtask): return observation_subtask +def _create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask: Subtask, input_dataproducts: list): + # select subtask output the new dataproducts will be linked to + pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output? + + # TODO: create them from the spec, instead of "copying" the input filename + dataformat = Dataformat.objects.get(value="MeasurementSet") + datatype = Datatype.objects.get(value="visibilities") + + # TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty" + dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="visibilities") + dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") + directory = _output_root_directory(pipeline_subtask) + "uv/" + + # input:output mapping is 1:1 + def output_dataproduct_filename(input_dp: Dataproduct) -> str: + """ Construct the output filename to produce for an input. """ + if '_' in input_dp.filename and input_dp.filename.startswith('L'): + return "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1]) + else: + return "L%s_%s" % (pipeline_subtask.pk, input_dp.filename) + + output_dataproducts = [Dataproduct(filename=output_dataproduct_filename(input_dp), + directory=directory, + dataformat=dataformat, + datatype=datatype, + producer=pipeline_subtask_output, + specifications_doc=input_dp.specifications_doc, + specifications_template=dataproduct_specifications_template, + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_template=dataproduct_feedback_template, + sap=input_dp.sap, + global_identifier=None) for input_dp in input_dataproducts] + + # create the dataproducts + output_dataproducts = _bulk_create_dataproducts_with_global_identifiers(output_dataproducts) + pipeline_subtask_output.dataproducts.set(output_dataproducts) + + transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=False) for input_dp,output_dp in zip(input_dataproducts, output_dataproducts)] + DataproductTransform.objects.bulk_create(transforms) + + return output_dataproducts + +def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: Subtask, input_dataproducts: list): + # select subtask output the new dataproducts will be linked to + pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output? + + dataformat = Dataformat.objects.get(value="pulp analysis") + datatype = Datatype.objects.get(value="pulsar profile") + dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="time series") + dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") + + directory = _output_root_directory(pipeline_subtask) + "pulp/" + + # ----- output tarball per input dataproduct + # input:output mapping is 1:1 + output_dataproducts = [Dataproduct(filename="%s.tar" % (splitext(input_dp.filename)[0],), # .h5 -> .tar + directory=directory+("cs/" if input_dp.specifications_doc["coherent"] else "is/"), + dataformat=dataformat, + datatype=datatype, + producer=pipeline_subtask_output, + specifications_doc=input_dp.specifications_doc, + specifications_template=dataproduct_specifications_template, + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_template=dataproduct_feedback_template, + sap=input_dp.sap, + global_identifier=None) for input_dp in input_dataproducts] + + # create the dataproducts + output_dataproducts = _bulk_create_dataproducts_with_global_identifiers(output_dataproducts) + pipeline_subtask_output.dataproducts.set(output_dataproducts) + + transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=False) for input_dp,output_dp in zip(input_dataproducts, output_dataproducts)] + DataproductTransform.objects.bulk_create(transforms) + + # ----- summary tarballs + # there is a tarball for each observation id and for cs and is separately, a tarball will be produced + + dataformat = Dataformat.objects.get(value="pulp summary") + datatype = Datatype.objects.get(value="quality") + dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="pulp summary") + dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") + + def dp_obsid(dataproduct): + """ Return the obsid of the dataproduct. """ + + # we parse the filename, because that's what pulp does, too + return dataproduct.filename.split("_")[0] + + # construct how input dataproducts map onto the summaries + # we use (obsid, coherent) as key, as those are the distinguishing characteristics of a summary. + summary_mapping = {dp: (dp_obsid(dp), dp.specifications_doc["coherent"]) for dp in input_dataproducts} + summaries = set(summary_mapping.values()) + + summary_dataproducts = {(obsid, is_coherent): Dataproduct(filename="L%s_summary%s.tar" % (obsid, "CS" if is_coherent else "IS"), + directory=directory+("cs/" if is_coherent else "is/"), + dataformat=dataformat, + datatype=datatype, + producer=pipeline_subtask_output, + specifications_doc={ "coherent": is_coherent, "identifiers": { "obsid": obsid } }, + specifications_template=dataproduct_specifications_template, + feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), + feedback_template=dataproduct_feedback_template, + sap=None, # TODO: Can we say anything here, as summaries cover all SAPs + global_identifier=None) for (obsid, is_coherent) in summaries} + + # create the dataproducts + _bulk_create_dataproducts_with_global_identifiers(summary_dataproducts.values()) + pipeline_subtask_output.dataproducts.add(*summary_dataproducts.values()) + + # populate the transform, each input_dp is input for its corresponding summary + transforms = [DataproductTransform(input=input_dp, output=summary_dataproducts[(obsid, is_coherent)], identity=False) for (input_dp, (obsid, is_coherent)) in summary_mapping.items()] + DataproductTransform.objects.bulk_create(transforms) + + return output_dataproducts + def schedule_pipeline_subtask(pipeline_subtask: Subtask): ''' Schedule the given pipeline_subtask This method should typically be called upon the event of an predecessor (observation) subtask being finished. @@ -1465,13 +1614,9 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (pipeline_subtask.pk, pipeline_subtask.specifications_template.type)) - # TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty" - dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="visibilities") - dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") - # iterate over all inputs + input_dataproducts = [] for pipeline_subtask_input in pipeline_subtask.inputs.all(): - # select and set input dataproducts that meet the filter defined in selection_doc dataproducts = [dataproduct for dataproduct in pipeline_subtask_input.producer.dataproducts.all() if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, pipeline_subtask_input.selection_doc)] @@ -1482,47 +1627,20 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): pipeline_subtask_input.id)) pipeline_subtask_input.dataproducts.set(dataproducts) + input_dataproducts.extend(dataproducts) - # select subtask output the new dataproducts will be linked to - pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output? - - # step 3: create output dataproducts, and link these to the output - # TODO: create them from the spec, instead of "copying" the input filename - dataformat = Dataformat.objects.get(value="MeasurementSet") - input_dps = list(pipeline_subtask_input.dataproducts.all()) - dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in input_dps]) - output_dp_objects = [] - for input_dp, dp_global_identifier in zip(input_dps, dp_global_identifiers): - if '_' in input_dp.filename and input_dp.filename.startswith('L'): - filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1]) - else: - filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename) - - output_dp = Dataproduct(filename=filename, - directory=input_dp.directory.replace(str(pipeline_subtask_input.producer.subtask.pk), str(pipeline_subtask.pk)), - dataformat=dataformat, - datatype=Datatype.objects.get(value="visibilities"), # todo: is this correct? - producer=pipeline_subtask_output, - specifications_doc=input_dp.specifications_doc, - specifications_template=dataproduct_specifications_template, - feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), - feedback_template=dataproduct_feedback_template, - sap=input_dp.sap, - global_identifier=dp_global_identifier) - output_dp_objects.append(output_dp) - - output_dps = Dataproduct.objects.bulk_create(output_dp_objects) - pipeline_subtask_output.dataproducts.set(output_dps) - - transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=False) for input_dp,output_dp in zip(input_dps, output_dps)] - DataproductTransform.objects.bulk_create(transforms) + # step 3: create output dataproducts, and link these to the output + if pipeline_subtask.specifications_template.name == "preprocessing pipeline": + _create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask, input_dataproducts) + elif pipeline_subtask.specifications_template.name == "pulsar pipeline": + _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask, input_dataproducts) - # step 4: resource assigner (if possible) - assign_or_unassign_resources(pipeline_subtask) + # step 4: resource assigner (if possible) + assign_or_unassign_resources(pipeline_subtask) - # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) - pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) - pipeline_subtask.save() + # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) + pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) + pipeline_subtask.save() return pipeline_subtask @@ -1735,11 +1853,6 @@ def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprin def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_specs, default_subtask_specs): - # todo: check that this is actually how these need to be translated - # todo: especially check when defaults are NOT supposed to be set because the task implies to not include them - - # todo: set subtask demixer properties "baselines": "CS*,RS*&" - subtask_specs = default_subtask_specs subtask_specs['storagemanager'] = preprocessing_task_specs['storagemanager'] @@ -1777,6 +1890,57 @@ def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_spe return subtask_specs +def _generate_subtask_specs_from_pulsar_pipeline_task_specs(pipeline_task_specs, default_subtask_specs): + subtask_specs = {} + + # Pulsar to fold + if pipeline_task_specs["pulsar"]["strategy"] == "manual": + # pulsar is specified explicitly + subtask_specs["pulsar"] = pipeline_task_specs["pulsar"]["name"] + else: + # search for the pulsar (f.e. in a library, based on the SAP direction) + subtask_specs["pulsar"] = pipeline_task_specs["pulsar"]["strategy"] + + subtask_specs["single_pulse"] = pipeline_task_specs["single_pulse_search"] + + # PRESTO + presto_specs = pipeline_task_specs["presto"] + subtask_specs["presto"] = {} + subtask_specs["presto"]["2bf2fits_extra_opts"] = "-nsamples={samples_per_block}".format(**presto_specs["input"]) + subtask_specs["presto"]["decode_nblocks"] = presto_specs["input"]["nr_blocks"] + subtask_specs["presto"]["decode_sigma"] = presto_specs["input"]["decode_sigma"] + subtask_specs["presto"]["nofold"] = not presto_specs["fold_profile"] + subtask_specs["presto"]["skip_prepfold"] = not presto_specs["prepfold"] + subtask_specs["presto"]["rrats"] = presto_specs["rrats"]["enabled"] + subtask_specs["presto"]["rrats_dm_range"] = presto_specs["rrats"]["dm_range"] + subtask_specs["presto"]["prepdata_extra_opts"] = "" + subtask_specs["presto"]["prepfold_extra_opts"] = "" + subtask_specs["presto"]["prepsubband_extra_opts"] = "" + subtask_specs["presto"]["rfifind_extra_opts"] = "" + + # DSPSR + dspsr_specs = pipeline_task_specs["dspsr"] + subtask_specs["dspsr"] = {} + subtask_specs["dspsr"]["skip_dspsr"] = not dspsr_specs["enabled"] + subtask_specs["dspsr"]["digifil_extra_opts"] = "-D {dm} -t {integration_time} -f {frequency_channels}{dedisperse}".format( + **dspsr_specs["digifil"], + dedisperse = ":D" if dspsr_specs["digifil"]["coherent_dedispersion"] else "") + subtask_specs["dspsr"]["nopdmp"] = not dspsr_specs["optimise_period_dm"] + subtask_specs["dspsr"]["norfi"] = not dspsr_specs["rfi_excision"] + subtask_specs["dspsr"]["tsubint"] = dspsr_specs["subintegration_length"] + subtask_specs["dspsr"]["dspsr_extra_opts"] = "" + + # output + output_specs = pipeline_task_specs["output"] + subtask_specs["output"] = {} + subtask_specs["output"]["raw_to_8bit"] = output_specs["quantisation"]["enabled"] + subtask_specs["output"]["8bit_conversion_sigma"] = output_specs["quantisation"]["scale"] + subtask_specs["output"]["skip_dynamic_spectrum"] = not output_specs["dynamic_spectrum"]["enabled"] + subtask_specs["output"]["dynamic_spectrum_time_average"] = output_specs["dynamic_spectrum"]["time_average"] + + return subtask_specs + + def specifications_doc_meets_selection_doc(specifications_doc, selection_doc): """ Filter specs by selection. This requires the specification_doc to... diff --git a/SAS/TMSS/backend/test/t_adapter.py b/SAS/TMSS/backend/test/t_adapter.py index 7e6584db476356ab17752eec2328b5958fcb2396..5294abcc79b5e8a86fb6e718145964d71707cb08 100755 --- a/SAS/TMSS/backend/test/t_adapter.py +++ b/SAS/TMSS/backend/test/t_adapter.py @@ -45,14 +45,14 @@ rest_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH) from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.exceptions import SubtaskInvalidStateException from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset, convert_to_parset_dict -from lofar.common.json_utils import get_default_json_object_for_schema +from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduct from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import append_to_subtask_raw_feedback, process_feedback_into_subtask_dataproducts, process_feedback_for_subtask_and_set_to_finished_if_complete, reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete from lofar.lta.sip import constants from lofar.parameterset import parameterset from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions -from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import ObservationResourceEstimator +from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import ObservationResourceEstimator, PulsarPipelineResourceEstimator class ObservationParsetAdapterTest(unittest.TestCase): @@ -202,6 +202,29 @@ class ObservationParsetAdapterTest(unittest.TestCase): self.assertEqual(nr_is_files, estimations["estimates"][1]["output_files"]["is"][0]["properties"]["nr_of_is_files"] * estimations["estimates"][1]["resource_count"]) self.assertEqual(4, estimations["estimates"][1]["output_files"]["is"][0]["properties"]["nr_of_is_stokes"]) +class PulsarPipelineParsetAdapterTest(unittest.TestCase): + def create_subtask(self, specifications_doc={}): + subtask_template = models.SubtaskTemplate.objects.get(name='pulsar pipeline') + specifications_doc = add_defaults_to_json_object_for_schema(specifications_doc, subtask_template.schema) + + subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc) + subtask:models.Subtask = models.Subtask.objects.create(**subtask_data) + + subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())]) + subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask)) + dataproduct:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output)) + return subtask + + def test_pulp(self): + subtask = self.create_subtask() + parset = convert_to_parset_dict(subtask) + logger.info("test_pulp parset:",parset) + + self.assertEqual(True, parset["Observation.DataProducts.Output_Pulsar.enabled"]) + + # TODO: ResourceEstimator needs a predecessor observation with dataproducts, so we forgo that for now. + + class SIPadapterTest(unittest.TestCase): def test_simple_sip_generate_from_dataproduct(self): """ @@ -383,7 +406,7 @@ _isCobalt=T set_subtask_state_following_allowed_transitions(subtask_obs, 'finishing') subtask_obs_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_obs)) - subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='preprocessing pipeline')) subtask_pipe: models.Subtask = models.Subtask.objects.create(**subtask_data) set_subtask_state_following_allowed_transitions(subtask_pipe, 'finishing') subtask_pipe_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pipe)) @@ -417,7 +440,7 @@ _isCobalt=T set_subtask_state_following_allowed_transitions(subtask_obs, 'finishing') subtask_obs_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_obs)) - subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='preprocessing pipeline')) subtask_pipe: models.Subtask = models.Subtask.objects.create(**subtask_data) set_subtask_state_following_allowed_transitions(subtask_pipe, 'finishing') subtask_pipe_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pipe)) @@ -461,7 +484,7 @@ _isCobalt=T set_subtask_state_following_allowed_transitions(subtask_obs, 'finishing') subtask_obs_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_obs)) - subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='preprocessing pipeline')) subtask_pipe: models.Subtask = models.Subtask.objects.create(**subtask_data) set_subtask_state_following_allowed_transitions(subtask_pipe, 'finishing') subtask_pipe_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pipe)) diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index be283562973fef76d10e6eb7bb94c6357ac9a830..8c5e2c735747e4abd1a65cc7ba389ea0eba9387c 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -69,10 +69,10 @@ def create_subtask_object_for_testing(subtask_type_value, subtask_state_value): """ Helper function to create a subtask object for testing with given subtask value and subtask state value as string (no object) - For these testcases 'pipeline control' and 'observation control' is relevant + For these testcases 'preprocessing pipeline' and 'observation control' is relevant """ task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(specifications_template=models.TaskTemplate.objects.get(name='target observation' if subtask_type_value=='observation' else 'preprocessing pipeline'))) - subtask_template_obj = models.SubtaskTemplate.objects.get(name="%s control" % subtask_type_value) + subtask_template_obj = models.SubtaskTemplate.objects.get(name='observation control' if subtask_type_value=='observation' else 'preprocessing pipeline') subtask_data = Subtask_test_data(subtask_template=subtask_template_obj) subtask = models.Subtask.objects.create(**subtask_data) subtask.task_blueprints.set([task_blueprint]) @@ -160,7 +160,8 @@ class SchedulingTest(unittest.TestCase): self.assertEqual([], duplicate_dataproduct_specification_docs) def test_schedule_observation_subtask_with_enough_resources_available(self): - spec = { "stations": { "digital_pointings": [ { "subbands": [0] } ] } } + spec = { "stations": { "digital_pointings": [ { "subbands": [0] } ] }, + "COBALT": { "correlator": { "enabled": True } } } self._test_schedule_observation_subtask_with_enough_resources_available(spec) def test_schedule_beamformer_observation_subtask_with_enough_resources_available(self): @@ -193,6 +194,7 @@ class SchedulingTest(unittest.TestCase): task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(task_blueprint_data, '/task_blueprint/') subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['subbands'] = [0] cluster_url = client.get_path_as_json_object('/cluster/1')['url'] @@ -229,6 +231,7 @@ class SchedulingTest(unittest.TestCase): subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['subbands'] = [0] spec['stations']['station_list'] = ['CS001', 'CS002', 'CS401'] @@ -268,9 +271,10 @@ class SchedulingTest(unittest.TestCase): task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(task_blueprint_data,'/task_blueprint/') subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['subbands'] = [0] - cluster_url = client.get_path_as_json_object('/cluster/1')['url'] spec['stations']['station_list'] = ['CS001', 'CS002', 'CS003'] + cluster_url = client.get_path_as_json_object('/cluster/1')['url'] subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], specifications_doc=spec, cluster_url=cluster_url, @@ -288,16 +292,13 @@ class SchedulingTest(unittest.TestCase): self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask_id)['status']) - def test_schedule_pipeline_subtask_with_enough_resources_available(self): - with tmss_test_env.create_tmss_client() as client: + def _setup_observation_and_pipeline(self, client, obs_spec, dataproduct_properties, pipeline_task_template_name, pipeline_subtask_template_name, pipeline_subtask_spec): cluster_url = client.get_path_as_json_object('/cluster/1')['url'] # setup: first create an observation, so the pipeline can have input. obs_task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="target observation")['url']) obs_task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(obs_task_blueprint_data, '/task_blueprint/') obs_subtask_template = client.get_subtask_template("observation control") - obs_spec = get_default_json_object_for_schema(obs_subtask_template['schema']) - obs_spec['stations']['digital_pointings'][0]['subbands'] = [0] obs_subtask_data = test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url'], specifications_doc=obs_spec, @@ -306,16 +307,14 @@ class SchedulingTest(unittest.TestCase): obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url'], task_blueprint_url=obs_task_blueprint['url']), '/subtask_output/') - test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], - specifications_doc={"sap": "target0", "subband": 0 }, - subtask_output_url=obs_subtask_output_url), '/dataproduct/') + test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(**dataproduct_properties, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the pipeline... - pipe_task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="preprocessing pipeline")['url']) + pipe_task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name=pipeline_task_template_name)['url']) pipe_task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(pipe_task_blueprint_data, '/task_blueprint/') - pipe_subtask_template = client.get_subtask_template("pipeline control") - pipe_spec = get_default_json_object_for_schema(pipe_subtask_template['schema']) + pipe_subtask_template = client.get_subtask_template(pipeline_subtask_template_name) + pipe_spec = add_defaults_to_json_object_for_schema(pipeline_subtask_spec, pipe_subtask_template['schema']) pipe_subtask_data = test_data_creator.Subtask(specifications_template_url=pipe_subtask_template['url'], specifications_doc=pipe_spec, @@ -333,6 +332,56 @@ class SchedulingTest(unittest.TestCase): client.set_subtask_status(predecessor['id'], state) client.set_subtask_status(pipe_subtask['id'], 'defined') + + return pipe_subtask + + def test_schedule_preprocessing_pipeline_subtask_with_enough_resources_available(self): + with tmss_test_env.create_tmss_client() as client: + obs_subtask_template = client.get_subtask_template("observation control") + obs_spec = get_default_json_object_for_schema(obs_subtask_template['schema']) + obs_spec['stations']['digital_pointings'][0]['subbands'] = [0] + obs_spec['COBALT']['correlator']['enabled'] = True + + pipe_subtask = self._setup_observation_and_pipeline(client, + obs_spec, + {"filename": "L123456_SB000.MS", + "specifications_doc": {"sap": "target0", "subband": 0 } }, + "preprocessing pipeline", + "preprocessing pipeline", + {}) + + subtask = client.schedule_subtask(pipe_subtask['id']) + + self.assertEqual('scheduled', subtask['state_value']) + self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=pipe_subtask['id'])['status']) + + def test_schedule_pulsar_pipeline_subtask_with_enough_resources_available(self): + with tmss_test_env.create_tmss_client() as client: + obs_subtask_template = client.get_subtask_template("observation control") + obs_spec = { + "stations": { "digital_pointings": [ { "name": "target0", "subbands": [0] } ] }, + "COBALT": { + "version": 1, + "correlator": { "enabled": False }, + "beamformer": { + "tab_pipelines": [ + { + "SAPs": [ { "name": "target0", "tabs": [ { "coherent": False }, { "coherent": True } ] } ] + } + ] + } + } + } + obs_spec = add_defaults_to_json_object_for_schema(obs_spec,obs_subtask_template['schema']) + + pipe_subtask = self._setup_observation_and_pipeline(client, + obs_spec, + {"filename": "L123456_SAP000_B000_S0_P000.h5", + "specifications_doc": { "sap": "target0", "coherent": True, "identifiers": { "sap_index": 0, "tab_index": 0, "pipeline_index": 0, "part_index": 0, "stokes_index": 0 } } }, + "pulsar pipeline", + "pulsar pipeline", + {}) + subtask = client.schedule_subtask(pipe_subtask['id']) self.assertEqual('scheduled', subtask['state_value']) diff --git a/SAS/TMSS/backend/test/t_scheduling_units.py b/SAS/TMSS/backend/test/t_scheduling_units.py index 98234e7d6bee7b43c22f395e402196538683b288..af237301a8991c0226b10b5eee3a251bbc652cf6 100644 --- a/SAS/TMSS/backend/test/t_scheduling_units.py +++ b/SAS/TMSS/backend/test/t_scheduling_units.py @@ -87,7 +87,7 @@ class SchedulingUnitBlueprintStateTest(unittest.TestCase): task_pipe.specifications_template = models.TaskTemplate.objects.get(type=models.TaskType.Choices.PIPELINE.value) task_pipe.save() subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), - subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_template=models.SubtaskTemplate.objects.get(name='preprocessing pipeline')) if "pipeline" in skip_create_subtask: subtask_pipe = None else: @@ -102,10 +102,11 @@ class SchedulingUnitBlueprintStateTest(unittest.TestCase): task_ingest = models.TaskBlueprint.objects.create(**task_data) task_ingest.specifications_template = my_test_template task_ingest.save() - # There is no template defined for ingest yet ...but I can use pipeline control, only the template type matters + # There is no template defined for ingest yet ...but I can use preprocessing pipeline, only the template type matters # ....should become other thing in future but for this test does not matter subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), - subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_template=models.SubtaskTemplate.objects.get(name='preprocessing pipeline')) + if "ingest" in skip_create_subtask: subtask_ingest = None else: diff --git a/SAS/TMSS/backend/test/t_subtasks.py b/SAS/TMSS/backend/test/t_subtasks.py index a59c19f89859090eac56e8b9d9d43425389eedeb..0faaec26e42863a8183d2a6fbb0226cbc3805723 100755 --- a/SAS/TMSS/backend/test/t_subtasks.py +++ b/SAS/TMSS/backend/test/t_subtasks.py @@ -215,7 +215,7 @@ class SubTasksCreationFromTaskBluePrint(unittest.TestCase): create_relation_task_blueprint_object_for_testing(task_blueprint, task_blueprint_preprocessing) subtask = create_preprocessing_subtask_from_task_blueprint(task_blueprint_preprocessing) self.assertEqual("defined", str(subtask.state)) - self.assertEqual("pipeline control", str(subtask.specifications_template.name)) + self.assertEqual("preprocessing pipeline", str(subtask.specifications_template.name)) self.assertEqual("pipeline", str(subtask.specifications_template.type)) def test_create_subtasks_from_task_blueprint_succeed(self): @@ -354,6 +354,88 @@ class SubTasksCreationFromTaskBluePrintCalibrator(unittest.TestCase): self.assertIn("results in 600 total subbands, but only 488 are possible", str(cm.exception)) +class SubTasksCreationFromTaskBluePrintCalibrator(unittest.TestCase): + + def test_create_sequence_of_subtask_from_task_blueprint_calibrator_failure(self): + """ + Create multiple subtasks from a task blueprint when task is a calibrator + Check that exception should occur due too missing related target observation + """ + task_blueprint = create_task_blueprint_object_for_testing(task_template_name="calibrator observation") + with self.assertRaises(SubtaskCreationException): + create_observation_control_subtask_from_task_blueprint(task_blueprint) + + @unittest.skip("JS 2020-09-08: Cannot reproduce SubtaskCreationException. How is this test supposed to work??") + def test_create_sequence_of_subtask_from_task_blueprint_calibrator(self): + """ + Create multiple subtasks from a task blueprint when task is a calibrator and is related to task blueprint + of a target observation + Check that exception should occur due too missing pointing setting in target observation, + the calibrator default is AutoSelect=True + Check NO exception, when AutoSelect=False + """ + cal_task_blueprint = create_task_blueprint_object_for_testing(task_template_name="calibrator observation") + target_task_blueprint = create_task_blueprint_object_for_testing() + create_scheduling_relation_task_blueprint_for_testing(cal_task_blueprint, target_task_blueprint) + + with self.assertRaises(SubtaskCreationException): + create_observation_control_subtask_from_task_blueprint(cal_task_blueprint) + + cal_task_blueprint.specifications_doc['autoselect'] = False + cal_task_blueprint.specifications_doc['pointing']['angle1'] = 1.111 + cal_task_blueprint.specifications_doc['pointing']['angle2'] = 2.222 + subtask = create_observation_control_subtask_from_task_blueprint(cal_task_blueprint) + self.assertEqual("defined", str(subtask.state)) + self.assertEqual("observation control", str(subtask.specifications_template.name)) + self.assertEqual("observation", str(subtask.specifications_template.type)) + self.assertEqual('J2000', subtask.specifications_doc['stations']['analog_pointing']['direction_type']) + self.assertEqual(1.111, subtask.specifications_doc['stations']['analog_pointing']['angle1']) + self.assertEqual(2.222, subtask.specifications_doc['stations']['analog_pointing']['angle2']) + + +class SubTaskCreationFromTaskBlueprintPipelines(unittest.TestCase): + + def test_create_subtask_from_task_blueprint_preprocessing_pipeline(self): + """ + Test that a preprocessing task blueprint can be turned into a preprocessing pipeline subtask + """ + + # setup + observation_task_blueprint = create_task_blueprint_object_for_testing() + pipeline_task_blueprint = create_task_blueprint_object_for_testing(task_template_name="preprocessing pipeline") + create_relation_task_blueprint_object_for_testing(observation_task_blueprint, pipeline_task_blueprint) + + create_observation_control_subtask_from_task_blueprint(observation_task_blueprint) + + # trigger + subtask = create_preprocessing_subtask_from_task_blueprint(pipeline_task_blueprint) + + # assert + self.assertEqual("defined", str(subtask.state)) + self.assertEqual("preprocessing pipeline", str(subtask.specifications_template.name)) + self.assertEqual(models.SubtaskType.Choices.PIPELINE.value, str(subtask.specifications_template.type)) + + def test_create_subtask_from_task_blueprint_preprocessing_pipeline(self): + """ + Test that ia pulsar task blueprint can be turned into a pulsar pipeline subtask + """ + + # setup + observation_task_blueprint = create_task_blueprint_object_for_testing() + pipeline_task_blueprint = create_task_blueprint_object_for_testing(task_template_name="pulsar pipeline") + create_relation_task_blueprint_object_for_testing(observation_task_blueprint, pipeline_task_blueprint) + + create_observation_control_subtask_from_task_blueprint(observation_task_blueprint) + + # trigger + subtask = create_pulsar_pipeline_subtask_from_task_blueprint(pipeline_task_blueprint) + + # assert + self.assertEqual("defined", str(subtask.state)) + self.assertEqual("pulsar pipeline", str(subtask.specifications_template.name)) + self.assertEqual(models.SubtaskType.Choices.PIPELINE.value, str(subtask.specifications_template.type)) + + class SubTaskCreationFromTaskBlueprintIngest(unittest.TestCase): def test_create_subtask_from_task_blueprint_ingest(self): diff --git a/SAS/TMSS/backend/test/t_tasks.py b/SAS/TMSS/backend/test/t_tasks.py index 1f7a77a95410e68f3e3d46c08658b7a51a128f1b..27dd9ebe6a90ed313b9d3817ed1113ea9c6a4408 100755 --- a/SAS/TMSS/backend/test/t_tasks.py +++ b/SAS/TMSS/backend/test/t_tasks.py @@ -272,7 +272,7 @@ class TaskBlueprintStateTest(unittest.TestCase): task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With One Subtask") task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) # Create pipeline subtask related to taskblueprint - subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='preprocessing pipeline')) subtask_pipe = models.Subtask.objects.create(**subtask_data) subtask_pipe.task_blueprints.set([task_blueprint])