Skip to content
Snippets Groups Projects
Commit d0ab7097 authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

Merge branch 'TMSS-318' into 'master'

Resolve TMSS-318

Closes TMSS-318

See merge request !261
parents 616f5c81 e21be007
No related branches found
No related tags found
2 merge requests!261Resolve TMSS-318,!260syncing cob-master with master again
{
"$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/ingest control/1#",
"$schema": "http://json-schema.org/draft-06/schema#",
"title":"ingest control",
"description":"This schema defines the parameters to setup and control an ingest subtask.",
"version":1,
"type": "object",
"properties": {
},
"required": [
]
}
{
"$id": "http://tmss.lofar.org/api/schemas/tasktemplate/ingest/1#",
"$schema": "http://json-schema.org/draft-06/schema#",
"title": "ingest",
"description": "This schema defines the parameters to setup an ingest task.",
"version": 1,
"type": "object",
"properties": {
},
"required": [
]
}
......@@ -112,5 +112,15 @@
{
"file_name": "sap_template-1.json",
"template": "sap_template"
},
{
"file_name": "subtask_template-ingest-1.json",
"template": "subtask_template",
"type": "copy"
},
{
"file_name": "task_template-ingest-1.json",
"template": "task_template",
"type": "ingest"
}
]
\ No newline at end of file
......@@ -49,7 +49,8 @@ def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subta
generators_mapping = {'target observation': [create_observation_control_subtask_from_task_blueprint,
create_qafile_subtask_from_task_blueprint,
create_qaplots_subtask_from_task_blueprint],
'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint]}
'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint],
'ingest': [create_ingest_subtask_from_task_blueprint]}
generators_mapping['calibrator observation'] = generators_mapping['target observation']
template_name = task_blueprint.specifications_template.name
......@@ -450,6 +451,50 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri
return subtask
def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
''' Create a subtask to for an ingest job
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# step 1: create subtask in defining state, with filled-in subtask_template
subtask_template = SubtaskTemplate.objects.get(name='ingest control')
default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema)
subtask_specs = default_subtask_specs # todo: translate specs from task to subtask once we have non-empty templates
cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
subtask_data = {"start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"specifications_doc": subtask_specs,
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": Cluster.objects.get(name=cluster_name)}
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input
for task_relation_blueprint in task_blueprint.produced_by.all():
producing_task_blueprint = task_relation_blueprint.producer
predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all()]
for predecessor_subtask in predecessor_subtasks:
for predecessor_subtask_output in predecessor_subtask.outputs.all():
SubtaskInput.objects.create(subtask=subtask,
producer=predecessor_subtask_output,
selection_doc=task_relation_blueprint.selection_doc,
selection_template=task_relation_blueprint.selection_template)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this ingest
return subtask
# ==== various schedule* methods to schedule a Subtasks (if possible) ====
def schedule_subtask(subtask: Subtask) -> Subtask:
......@@ -469,6 +514,9 @@ def schedule_subtask(subtask: Subtask) -> Subtask:
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value:
return schedule_qaplots_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.COPY.value:
return schedule_copy_subtask(subtask)
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." %
(subtask.pk, subtask.specifications_template.type.value))
except Exception as e:
......@@ -865,6 +913,59 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask):
return pipeline_subtask
def schedule_copy_subtask(copy_subtask: Subtask):
''' Schedule the given copy_subtask
This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(copy_subtask)
if copy_subtask.specifications_template.type.value != SubtaskType.Choices.COPY.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (copy_subtask.pk,
copy_subtask.specifications_template.type,
SubtaskType.Choices.COPY.value))
# step 1: set state to SCHEDULING
copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
copy_subtask.save()
# step 1a: check start/stop times
# not very relevant for tmss/dynamic scheduling, but the resource assigner demands it.
if copy_subtask.start_time is None:
now = datetime.utcnow()
logger.info("copy id=%s has no starttime. assigned default: %s", copy_subtask.pk, formatDatetime(now))
copy_subtask.start_time = now
if copy_subtask.stop_time is None:
stop_time = copy_subtask.start_time + timedelta(hours=+1)
logger.info("copy id=%s has no stop_time. assigned default: %s", copy_subtask.pk, formatDatetime(stop_time))
copy_subtask.stop_time = stop_time
# step 2: link input dataproducts
if copy_subtask.inputs.count() == 0:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (copy_subtask.pk,
copy_subtask.specifications_template.type))
# iterate over all inputs
for copy_subtask_input in copy_subtask.inputs.all():
# select and set input dataproducts that meet the filter defined in selection_doc
dataproducts = [dataproduct for dataproduct in copy_subtask_input.producer.dataproducts.all()
if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, copy_subtask_input.selection_doc)]
copy_subtask_input.dataproducts.set(dataproducts)
# todo: I assume that there is no RA involvement here? If there is, how does a copy parset look like?
# step 4: resource assigner (if possible)
#_assign_resources(copy_subtask)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
copy_subtask.save()
return copy_subtask
# === Misc ===
def create_and_schedule_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
......
......@@ -189,6 +189,49 @@ class SchedulingTest(unittest.TestCase):
self.assertEqual('scheduled', subtask['state_value'])
self.assertEqual('scheduled', ra_test_env.radb.getTask(tmss_id=pipe_subtask['id'])['status'])
def test_schedule_ingest_subtask(self):
with tmss_test_env.create_tmss_client() as client:
cluster_url = client.get_path_as_json_object('/cluster/1')['url']
# setup: first create an observation, so the ingest can have input.
obs_subtask_template = client.get_subtask_template("observation control")
obs_spec = get_default_json_object_for_schema(obs_subtask_template['schema'])
obs_spec['stations']['digital_pointings'][0]['subbands'] = [0]
obs_subtask_data = test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url'],
specifications_doc=obs_spec,
cluster_url=cluster_url,
task_blueprint_url=test_data_creator.post_data_and_get_url(test_data_creator.TaskBlueprint(), '/task_blueprint/'))
obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/')
obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/')
test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'],
subtask_output_url=obs_subtask_output_url), '/dataproduct/')
# now create the ingest...
ingest_subtask_template = client.get_subtask_template("ingest control")
ingest_spec = get_default_json_object_for_schema(ingest_subtask_template['schema'])
ingest_subtask_data = test_data_creator.Subtask(specifications_template_url=ingest_subtask_template['url'],
specifications_doc=ingest_spec,
task_blueprint_url=obs_subtask['task_blueprint'],
cluster_url=cluster_url)
ingest_subtask = test_data_creator.post_data_and_get_response_as_json_object(ingest_subtask_data, '/subtask/')
# ...and connect it to the observation
test_data_creator.post_data_and_get_url(test_data_creator.SubtaskInput(subtask_url=ingest_subtask['url'], subtask_output_url=obs_subtask_output_url), '/subtask_input/')
test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=ingest_subtask['url']), '/subtask_output/')
for predecessor in client.get_subtask_predecessors(ingest_subtask['id']):
client.set_subtask_status(predecessor['id'], 'finished')
client.set_subtask_status(ingest_subtask['id'], 'defined')
# trigger
subtask = client.schedule_subtask(ingest_subtask['id'])
# assert
self.assertEqual('scheduled', subtask['state_value'])
self.assertEqual(models.Subtask.objects.get(id=ingest_subtask['id']).inputs.first().dataproducts.count(), 1)
def test_schedule_schedulingunit_enough_resources_available(self):
'''similar test as test_schedule_pipeline_subtask_with_enough_resources_available, but now created from a scheduling_unit'''
......
......@@ -283,6 +283,25 @@ class SubTasksCreationFromTaskBluePrintCalibrator(unittest.TestCase):
self.assertEqual(2.222, subtask.specifications_doc['stations']['analog_pointing']['angle2'])
class SubTaskCreationFromTaskBlueprintIngest(unittest.TestCase):
def test_create_subtask_from_task_blueprint_ingest(self):
"""
Test that ingest task blueprint can be turned into a ingest control subtask
"""
# setup
ingest_task_blueprint = create_task_blueprint_object_for_testing(task_template_name="ingest")
# trigger
subtask = create_ingest_subtask_from_task_blueprint(ingest_task_blueprint)
# assert
self.assertEqual("defined", str(subtask.state))
self.assertEqual("ingest control", str(subtask.specifications_template.name))
self.assertEqual("copy", str(subtask.specifications_template.type))
class SubtaskInputSelectionFilteringTest(unittest.TestCase):
def setUp(self) -> None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment