diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index 00ed6e2a27944488a317e540fa46c972b8b0f13e..313aaf8090155c185fcc8ee7b62243dd52c8f74b 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -439,7 +439,7 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) # DPPP steps dppp_steps = [] - if "preflagger0" in spec: + if spec["preflagger0"]["enabled"]: dppp_steps.append('preflagger[0]') parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].chan"] = "[%s]" % spec["preflagger0"]["channels"] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].abstime"] = "[]" @@ -458,7 +458,7 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeslot"] = "[]" parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].type"] = "preflagger" - if 'preflagger1' in spec: + if spec["preflagger1"]["enabled"]: dppp_steps.append('preflagger[1]') parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].corrtype"] = spec["preflagger1"]["corrtype"] parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].abstime"] = "[]" @@ -477,7 +477,7 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeslot"] = "[]" parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].type"] = "preflagger" - if 'aoflagger' in spec: + if spec["aoflagger"]["enabled"]: dppp_steps.append('aoflagger') parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.strategy"] = spec["aoflagger"]["strategy"] parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.autocorr"] = "F" @@ -493,7 +493,7 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.timewindow"] = "0" parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.type"] = "aoflagger" - if "demixer" in spec: + if spec["demixer"]["enabled"]: dppp_steps.append('demixer') parset["Observation.ObservationControl.PythonControl.DPPP.demixer.baseline"] = spec["demixer"]["baselines"] parset["Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep"] = spec["demixer"]["demix_frequency_steps"] @@ -514,6 +514,10 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.ObservationControl.PythonControl.DPPP.demixer.subtractsources"] = "" parset["Observation.ObservationControl.PythonControl.DPPP.demixer.targetsource"] = "" parset["Observation.ObservationControl.PythonControl.DPPP.demixer.type"] = "demixer" + else: + # ResourceEstimator wants these keys always + parset["Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep"] = 1 + parset["Observation.ObservationControl.PythonControl.DPPP.demixer.timestep"] = 1 parset["Observation.ObservationControl.PythonControl.DPPP.steps"] = "[%s]" % ",".join(dppp_steps) parset["Observation.ObservationControl.PythonControl.DPPP.msout.storagemanager.name"] = spec["storagemanager"] diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py index 76e057c97456f40d5b35c670c27f1f60d9d88ccc..30a2d4029769070ebf204aeda4fada4565e59f1b 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py @@ -2,6 +2,7 @@ from lofar.sas.tmss.tmss.exceptions import * from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Dataproduct, SubtaskType, Subtask, SubtaskOutput, SIPidentifier, Algorithm from lofar.sas.tmss.tmss.tmssapp.models.specification import Datatype, Dataformat from lofar.lta.sip import siplib, ltasip, validator, constants +from lofar.common.json_utils import add_defaults_to_json_object_for_schema import uuid import logging @@ -182,13 +183,14 @@ def create_sip_representation_for_subtask(subtask: Subtask): process_map=process_map) if subtask.specifications_template.name == "pipeline control": # todo: re-evaluate this because schema name might change + spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema) pipeline = siplib.AveragingPipeline( # <-- this is what we need for UC1 pipeline_map, numberofcorrelateddataproducts=get_number_of_dataproducts_of_type(subtask, Dataformat.Choices.MEASUREMENTSET.value), - frequencyintegrationstep=subtask.specifications_doc.get('demixer',{}).get('frequency_steps', 0), - timeintegrationstep=subtask.specifications_doc.get('demixer',{}).get('time_step', 0), - flagautocorrelations=subtask.task_blueprint.specifications_doc["flag"]["autocorrelations"], - demixing=True if 'demix' in subtask.task_blueprint.specifications_doc else False + frequencyintegrationstep=spec['demixer']['frequency_steps'] if spec['demixer']['enabled'] else 1, + timeintegrationstep=spec['demixer']['time_steps'] if spec['demixer']['enabled'] else 1, + flagautocorrelations=spec['preflagger1']['enabled'] and spec['preflagger1']['corrtype'] == 'auto', + demixing=spec['demixer']['enabled'] and (spec['demixer']['demix_always'] or spec['demixer']['demix_if_needed']) ) # todo: distinguish and create other pipeline types. Probably most of these can be filled in over time as needed, # but they are not required for UC1. Here are stubs to start from for the other types the LTA supports: diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/UC1-scheduling-unit-observation-strategy.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/UC1-scheduling-unit-observation-strategy.json index 6df8766207e5752434d1bd9ea19e17a3bf0f5af6..33a51e3c0f967a083a8cd8e212f68eddfed5f3bb 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/UC1-scheduling-unit-observation-strategy.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/UC1-scheduling-unit-observation-strategy.json @@ -20,7 +20,7 @@ "tags": [], "specifications_doc": { "flag": { - "rfi_strategy": "auto", + "rfi_strategy": "HBAdefault", "outerchannels": true, "autocorrelations": true }, @@ -115,7 +115,7 @@ "tags": [], "specifications_doc": { "flag": { - "rfi_strategy": "auto", + "rfi_strategy": "HBAdefault", "outerchannels": true, "autocorrelations": true }, @@ -138,7 +138,7 @@ "tags": [], "specifications_doc": { "flag": { - "rfi_strategy": "auto", + "rfi_strategy": "HBAdefault", "outerchannels": true, "autocorrelations": true }, @@ -176,7 +176,7 @@ "tags": [], "specifications_doc": { "flag": { - "rfi_strategy": "auto", + "rfi_strategy": "HBAdefault", "outerchannels": true, "autocorrelations": true }, diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/short-observation-pipeline-ingest-scheduling-unit-observation-strategy.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/short-observation-pipeline-ingest-scheduling-unit-observation-strategy.json index 4e9fab905cf2533dc7ca1dc36d1f3b9052a0b147..bd7eea6fc5ab98a051c05833e09c7baec4604a42 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/short-observation-pipeline-ingest-scheduling-unit-observation-strategy.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/short-observation-pipeline-ingest-scheduling-unit-observation-strategy.json @@ -52,7 +52,7 @@ "tags": [], "specifications_doc": { "flag": { - "rfi_strategy": "auto", + "rfi_strategy": "HBAdefault", "outerchannels": true, "autocorrelations": true }, diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pipeline-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pipeline-1.json index 8307de613566df0b7a19d2417a24b740d3f41e7a..e52ab545b6fb1fc8224b83a9144f880dbd0fed1f 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pipeline-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/subtask_template-pipeline-1.json @@ -12,6 +12,11 @@ "type": "object", "additionalProperties": false, "properties": { + "enabled": { + "type": "boolean", + "title": "Enabled", + "default": false + }, "channels": { "title": "Channels", "type": "string", @@ -19,7 +24,7 @@ } }, "required": [ - "channels" + "enabled" ], "default": {} }, @@ -29,6 +34,11 @@ "type": "object", "additionalProperties": false, "properties": { + "enabled": { + "type": "boolean", + "title": "Enabled", + "default": false + }, "corrtype": { "title": "Correlations", "type": "string", @@ -41,7 +51,7 @@ } }, "required": [ - "corrtype" + "enabled" ], "default": {} }, @@ -51,6 +61,11 @@ "type": "object", "additionalProperties": false, "properties": { + "enabled": { + "type": "boolean", + "title": "Enabled", + "default": false + }, "strategy": { "title": "Strategy", "type": "string", @@ -62,7 +77,7 @@ } }, "required": [ - "strategy" + "enabled" ], "default": {} }, @@ -72,6 +87,11 @@ "type": "object", "additionalProperties": false, "properties": { + "enabled": { + "type": "boolean", + "title": "Enabled", + "default": false + }, "baselines": { "title": "Baselines", "type": "string", @@ -142,14 +162,7 @@ } }, "required": [ - "baselines", - "frequency_steps", - "time_steps", - "demix_frequency_steps", - "demix_time_steps", - "ignore_target", - "demix_always", - "demix_if_needed" + "enabled" ], "default": {} }, @@ -164,6 +177,5 @@ } }, "required": [ - "storagemanager" ] } diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-preprocessing_pipeline-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-preprocessing_pipeline-1.json index 74278f49310705212c20f65d8afe9aa61fb6ed97..0c6e37c3eb7f976d4836e5354ee565726497499e 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-preprocessing_pipeline-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/task_template-preprocessing_pipeline-1.json @@ -24,10 +24,9 @@ "rfi_strategy": { "type": "string", "title": "RFI flagging strategy", - "default": "auto", + "default": "HBAdefault", "enum": [ "none", - "auto", "HBAdefault", "LBAdefault" ] @@ -122,16 +121,7 @@ } }, "required": [ - "frequency_steps", - "time_steps", - "ignore_target", - "sources" ], - "options": { - "dependencies": { - "demix": true - } - }, "default": {} }, "storagemanager": { @@ -139,12 +129,12 @@ "title": "Storage Manager", "default": "dysco", "enum": [ - "basic", + "standard", "dysco" ] } }, "required": [ - "storagemanager" + "average" ] -} \ No newline at end of file +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 856c523be56c5a471099ab484f6eb04412b678a8..5c1513c829161770f6a6a8101976cbb03d0f5537 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -598,7 +598,8 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri # step 1: create subtask in defining state, with filled-in subtask_template subtask_template = SubtaskTemplate.objects.get(name='pipeline control') default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema) - subtask_specs = _generate_subtask_specs_from_preprocessing_task_specs(task_blueprint.specifications_doc, default_subtask_specs) + task_specs_with_defaults = add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc, task_blueprint.specifications_template.schema) + subtask_specs = _generate_subtask_specs_from_preprocessing_task_specs(task_specs_with_defaults, default_subtask_specs) cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") subtask_data = { "start_time": None, "stop_time": None, @@ -1524,63 +1525,44 @@ def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprin def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_specs, default_subtask_specs): - # preprocessing task default spec: { - # "storagemanager": "dysco", - # "flag": {"outerchannels": true, "autocorrelations": true, "rfi_strategy": "auto"}, - # "demix": {"frequency_steps": 64, "time_steps": 10, "ignore_target": false, "sources": {}}, - # "average": {"frequency_steps": 4, "time_steps": 1}} - # pipelinecontrol subtask default spec: { - # "storagemanager": "dysco", - # "demixer": {"baselines": "CS*,RS*&", "frequency_steps": 4, "time_steps": 1, "demix_frequency_steps": 4, - # "demix_time_steps": 1, "ignore_target": false, "demix_always": [], "demix_if_needed": []}, - # "aoflagger": {"strategy": "HBAdefault"}, - # "preflagger0": {"channels": "0..nchan/32-1,31*nchan/32..nchan-1"}, - # "preflagger1": {"corrtype": "auto"}} - # todo: check that this is actually how these need to be translated # todo: especially check when defaults are NOT supposed to be set because the task implies to not include them - # todo: translate task "sources": {} - I guess this is demix_always/demix_if_needed? - # todo: set subtask demixer properties "baselines": "CS*,RS*&", "demix_always": [], "demix_if_needed": [] - - subtask_specs = {} - subtask_specs['storagemanager'] = preprocessing_task_specs.get('storagemanager', - default_subtask_specs.get('storagemanager')) - - # todo: we depend on valid json here with knowledge about required properties. To generalize, we need to expect things to not be there. - if 'demix' or 'average' in preprocessing_task_specs: - # todo: should we exclude defaults in subtask.demixer if only one of these is defined on the task? - subtask_specs['demixer'] = default_subtask_specs['demixer'] - if 'demix' in preprocessing_task_specs: - subtask_specs['demixer'].update({ - "demix_frequency_steps": preprocessing_task_specs['demix']['frequency_steps'], - "demix_time_steps": preprocessing_task_specs['demix']['time_steps'], - "ignore_target": preprocessing_task_specs['demix']['ignore_target'] - }), - if 'average' in preprocessing_task_specs: - subtask_specs['demixer'].update({ - "demix_frequency_steps": preprocessing_task_specs['demix']['frequency_steps'], - "frequency_steps": preprocessing_task_specs['average']['frequency_steps'], - "demix_time_steps": preprocessing_task_specs['demix']['time_steps'], - "time_steps": preprocessing_task_specs['average']['time_steps'], - "ignore_target": preprocessing_task_specs['demix']['ignore_target'] - }), - if 'flag' in preprocessing_task_specs: - if preprocessing_task_specs["flag"]["rfi_strategy"] != 'none': - subtask_specs.update({"aoflagger": {"strategy": preprocessing_task_specs["flag"]["rfi_strategy"]}}) - - if preprocessing_task_specs["flag"]["rfi_strategy"] == 'auto': - # todo: handle 'auto' properly: we need to determine input dataproduct type and set LBA or HBA accordingly - # either here or allow 'auto' in subtask json and translate it when we connect obs to pipe subtask - default_strategy = default_subtask_specs['aoflagger']['strategy'] - subtask_specs.update({"aoflagger": {"strategy": default_strategy}}) - logger.warning('Translating aoflagger "auto" strategy to "%s" without knowing whether that makes sense!' % default_strategy) - - if preprocessing_task_specs["flag"]["outerchannels"]: - subtask_specs.update({"preflagger0": {"channels": "0..nchan/32-1,31*nchan/32..nchan-1"}}) - - if preprocessing_task_specs["flag"]["autocorrelations"]: - subtask_specs.update({"preflagger1": {"corrtype": "auto"}}) + # todo: set subtask demixer properties "baselines": "CS*,RS*&" + + subtask_specs = default_subtask_specs + subtask_specs['storagemanager'] = preprocessing_task_specs['storagemanager'] + + # averaging (performed by the demixer) + subtask_specs["demixer"]["enabled"] = True + subtask_specs['demixer']["frequency_steps"] = preprocessing_task_specs['average']['frequency_steps'] + subtask_specs['demixer']["time_steps"] = preprocessing_task_specs['average']['time_steps'] + + # demixing + subtask_specs['demixer']["demix_frequency_steps"] = preprocessing_task_specs['demix']['frequency_steps'] + subtask_specs['demixer']["demix_time_steps"] = preprocessing_task_specs['demix']['time_steps'] + subtask_specs['demixer']["ignore_target"] = preprocessing_task_specs['demix']['ignore_target'] + subtask_specs['demixer']["demix_always"] = [source for source,strategy in preprocessing_task_specs['demix']['sources'].items() if strategy == "yes"] + subtask_specs['demixer']["demix_if_needed"] = [source for source,strategy in preprocessing_task_specs['demix']['sources'].items() if strategy == "auto"] + + # flagging + if preprocessing_task_specs["flag"]["rfi_strategy"] != 'none': + subtask_specs["aoflagger"]["enabled"] = True + subtask_specs["aoflagger"]["strategy"] = preprocessing_task_specs["flag"]["rfi_strategy"] + else: + subtask_specs["aoflagger"]["enabled"] = False + + if preprocessing_task_specs["flag"]["outerchannels"]: + subtask_specs["preflagger0"]["enabled"] = True + subtask_specs["preflagger0"]["channels"] = "0..nchan/32-1,31*nchan/32..nchan-1" + else: + subtask_specs["preflagger0"]["enabled"] = False + + if preprocessing_task_specs["flag"]["autocorrelations"]: + subtask_specs["preflagger1"]["enabled"] = True + subtask_specs["preflagger1"]["corrtype"] = "auto" + else: + subtask_specs["preflagger1"]["enabled"] = False return subtask_specs