diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 243cb8b3ddbc8729a94b61606a3a7a4c93b5be42..f993fe99c66e614c232a8c1ad92ac7244ac97416 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -57,7 +57,8 @@ def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subta create_qafile_subtask_from_task_blueprint, create_qaplots_subtask_from_task_blueprint], 'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint], - 'ingest': [create_ingest_subtask_from_task_blueprint]} + 'ingest': [create_ingest_subtask_from_task_blueprint], + 'cleanup': [create_cleanup_subtask_from_task_blueprint]} generators_mapping['calibrator observation'] = generators_mapping['target observation'] generators_mapping['beamforming observation'] = [create_observation_control_subtask_from_task_blueprint] @@ -625,6 +626,49 @@ def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> return subtask +def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: + ''' Create a subtask for a cleanup job + This method implements "Instantiate subtasks" step from the "Specification Flow" + https://support.astron.nl/confluence/display/TMSS/Specification+Flow + ''' + # step 0: check pre-requisites + check_prerequities_for_subtask_creation(task_blueprint) + + # step 1: create subtask in defining state, with filled-in subtask_template + subtask_template = SubtaskTemplate.objects.get(name='cleanup') + subtask_specs = get_default_json_object_for_schema(subtask_template.schema) + cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") + subtask_data = {"start_time": None, + "stop_time": None, + "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), + "task_blueprint": task_blueprint, + "specifications_template": subtask_template, + "specifications_doc": subtask_specs, + "cluster": Cluster.objects.get(name=cluster_name)} + subtask = Subtask.objects.create(**subtask_data) + + # step 2: create and link subtask input + # for this cleanup subtask an 'input' seems a bit weird, but it actually makes sense! + # this cleanup subtask will cleanup the output data of all linked input predecessors. + for task_relation_blueprint in task_blueprint.produced_by.all(): + producing_task_blueprint = task_relation_blueprint.producer + + predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.filter(specifications_template__type__value__in=(SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)).order_by('id').all()] + for predecessor_subtask in predecessor_subtasks: + for predecessor_subtask_output in predecessor_subtask.outputs.all(): + SubtaskInput.objects.create(subtask=subtask, + producer=predecessor_subtask_output, + selection_doc=task_relation_blueprint.selection_doc, + selection_template=task_relation_blueprint.selection_template) + + # step 3: set state to DEFINED + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + subtask.save() + + # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this ingest + return subtask + + # ==== various schedule* methods to schedule a Subtasks (if possible) ==== def schedule_subtask(subtask: Subtask) -> Subtask: