Skip to content
Snippets Groups Projects
Commit dcf25c26 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2829: initial creation of copy subtask. todo: copy task settings, manage output if needed

parent ab4e1a5c
No related branches found
No related tags found
1 merge request!1296Resolve TMSS-2829
......@@ -88,6 +88,7 @@ def create_or_update_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint)
create_qaplots_subtask_from_task_blueprint],
'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint],
'pulsar pipeline': [create_pulsar_pipeline_subtask_from_task_blueprint],
'copy pipeline': [create_copy_pipeline_subtask_from_task_blueprint],
'ingest': [create_ingest_subtask_from_task_blueprint],
'cleanup': [create_cleanup_subtask_from_task_blueprint]}
generators_mapping['calibrator observation'] = generators_mapping['target observation']
......@@ -989,6 +990,50 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) ->
return subtask
def create_copy_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# step 1: create subtask in defining state, with filled-in subtask_template
copy_subtask_template = SubtaskTemplate.get_version_or_latest(name="copy pipeline")
copy_subtask_spec = copy_subtask_template.get_default_json_document_for_schema()
# copy_subtask_spec['']
copy_subtask_data = { "scheduled_start_time": None,
"scheduled_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": task_blueprint,
"specifications_template": copy_subtask_template,
"specifications_doc": copy_subtask_spec,
"primary": True } #,
#"cluster": observation_subtask.cluster}
copy_subtask = Subtask.objects.create(**copy_subtask_data)
# step 2: create and link subtask input/output
selection_template = TaskRelationSelectionTemplate.get_version_or_latest(name="all")
selection_doc = selection_template.get_default_json_document_for_schema()
# for obs_out in observation_subtask.outputs.filter(output_role__role__value=Role.Choices.CORRELATOR.value).all():
# qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask,
# producer=obs_out, # TODO: determine proper producer based on spec in task_relation_blueprint
# input_role=None, # TODO: determine proper role based on spec in task_relation_blueprint
# selection_doc=selection_doc,
# selection_template=selection_template)
#
# # create an internal SubtaskOutput (output_role=None), because we do not expose qa data yet at task level
# qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, output_role=None, filesystem=_output_filesystem(qafile_subtask))
# step 3: set state to DEFINED
copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
copy_subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this copy_subtask
return copy_subtask
# ==== various schedule* methods to schedule a Subtasks (if possible) ====
# LOFAR needs to have a gap in between observations to (re)initialize hardware.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment