diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py index 57f01d2b9190870e0ce9f94b4b4538f701fa4a86..4b8da3503ff22a4948d512009dd1b65327db9e25 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py @@ -192,37 +192,60 @@ def create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft: models.S """ logger.debug("create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft.id=%s, name='%s') ...", scheduling_unit_draft.pk, scheduling_unit_draft.name) - if len(specifications_doc.get("tasks", {})) > 0: - for task_name, task_definition in specifications_doc["tasks"].items(): + with transaction.atomic(): + # remove existing task relations which are not specified (anymore). + for task_rel in list(models.TaskRelationDraft.objects.filter(producer__scheduling_unit_draft__id=scheduling_unit_draft.id).all() | + models.TaskRelationDraft.objects.filter(consumer__scheduling_unit_draft__id=scheduling_unit_draft.id).all()): + if len([spec_task_rel for spec_task_rel in specifications_doc.get("task_relations", []) if spec_task_rel['producer']==task_rel.producer.name and spec_task_rel['consumer']==task_rel.consumer.name])==0: + # existing task_rel is not specified (anymore). Remove it. + task_rel.delete() + + # remove existing task scheduling relations which are not specified (anymore). + for task_sched_rel in list(models.TaskSchedulingRelationDraft.objects.filter(first__scheduling_unit_draft__id=scheduling_unit_draft.id).all() | + models.TaskSchedulingRelationDraft.objects.filter(first__scheduling_unit_draft__id=scheduling_unit_draft.id).all()): + if len([spec_task_sched_rel for spec_task_sched_rel in specifications_doc.get("task_scheduling_relations",[]) if spec_task_sched_rel['first']==task_sched_rel.first.name and spec_task_sched_rel['second']==task_sched_rel.second.name])==0: + # existing task_sched_rel is not specified (anymore). Remove it. + task_sched_rel.delete() + + # remove existing tasks which are not specified (anymore). + for task in scheduling_unit_draft.task_drafts.all(): + if len([spec_task_name for spec_task_name in specifications_doc.get("tasks",{}).keys() if spec_task_name==task.name])==0: + # existing task_sched_rel is not specified (anymore). Remove it. + task.delete() + + # add/update new/updated tasks + for task_name, task_definition in specifications_doc.get("tasks", {}).items(): task_template_name = task_definition["specifications_template"] task_template = models.TaskTemplate.objects.get(name=task_template_name) task_specifications_doc = task_definition["specifications_doc"] task_specifications_doc = add_defaults_to_json_object_for_schema(task_specifications_doc, task_template.schema, cache=_schema_cache) + logger.debug("creating/updating task draft... task_name='%s', task_template_name='%s'", task_template_name, task_template_name) + try: - logger.debug("creating task draft... task_name='%s', task_template_name='%s'", task_template_name, task_template_name) - - with transaction.atomic(): - task_draft = models.TaskDraft.objects.create(name=task_name, - description=task_definition.get("description",""), - tags=task_definition.get("tags",[]), - specifications_doc=task_specifications_doc, - copy_reason=models.CopyReason.objects.get(value='template'), - copies=None, - scheduling_unit_draft=scheduling_unit_draft, - specifications_template=task_template) - - logger.info("created task draft id=%s task_name='%s', task_template_name='%s'", task_draft.pk, task_name, task_template_name) - except IntegrityError as e: - if 'TaskDraft_unique_name_in_scheduling_unit' in str(e): - logger.info("task draft task_name='%s', task_template_name='%s' already exists in scheduling_unit id=%s name='%s'", - task_name, task_template_name, scheduling_unit_draft.id, scheduling_unit_draft.name) - else: - raise + task_draft = scheduling_unit_draft.task_drafts.get(name=task_name) + task_draft.description = task_definition.get("description", "") + task_draft.specifications_doc = task_specifications_doc + task_draft.copy_reason = models.CopyReason.objects.get(value='template') + task_draft.specifications_template = task_template + task_draft.save() + + logger.info("updated task draft id=%s task_name='%s', task_template_name='%s'", + task_draft.pk, task_name, task_template_name) + except models.TaskDraft.DoesNotExist: + task_draft = models.TaskDraft.objects.create(name=task_name, + description=task_definition.get("description", ""), + scheduling_unit_draft=scheduling_unit_draft, + specifications_doc = task_specifications_doc, + specifications_template=task_template, + copy_reason = models.CopyReason.objects.get(value='template')) + logger.info("created task draft id=%s task_name='%s', task_template_name='%s'", + task_draft.pk, task_name, task_template_name) + # Now create task relations - for task_relation_definition in specifications_doc["task_relations"]: + for task_relation_definition in specifications_doc.get("task_relations", []): try: producer_task_draft = scheduling_unit_draft.task_drafts.get(name=task_relation_definition["producer"]) consumer_task_draft = scheduling_unit_draft.task_drafts.get(name=task_relation_definition["consumer"]) @@ -242,46 +265,31 @@ def create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft: models.S logger.error("Could not determine Task Relations for %s. Error: %s", task_relation_definition, e) raise - try: - with transaction.atomic(): - task_relation = models.TaskRelationDraft.objects.create(tags=task_relation_definition.get("tags",[]), - producer=producer_task_draft, - consumer=consumer_task_draft, - input_role=input_role, - output_role=output_role, - selection_doc=selection_doc, - selection_template=selection_template) - logger.info("created task_relation id=%s between task draft id=%s name='%s' and id=%s name='%s", - task_relation.pk, producer_task_draft.id, producer_task_draft.name, consumer_task_draft.id, consumer_task_draft.name) - except IntegrityError as e: - if 'TaskRelationDraft_unique_relation' in str(e): - logger.info("task_relation between task draft id=%s name='%s' and id=%s name='%s already exists", - producer_task_draft.id, producer_task_draft.name, consumer_task_draft.id, consumer_task_draft.name) - else: - raise - + task_relation, created = models.TaskRelationDraft.objects.update_or_create(tags=task_relation_definition.get("tags",[]), + producer=producer_task_draft, + consumer=consumer_task_draft, + input_role=input_role, + output_role=output_role, + selection_doc=selection_doc, + selection_template=selection_template) + logger.info("%s task_relation id=%s between task draft id=%s name='%s' and id=%s name='%s", + "created" if created else "updated", + task_relation.pk, producer_task_draft.id, producer_task_draft.name, consumer_task_draft.id, consumer_task_draft.name) # task_scheduling_relation - for task_scheduling_relation_definition in specifications_doc["task_scheduling_relations"]: + for task_scheduling_relation_definition in specifications_doc.get("task_scheduling_relations", []): placement = models.SchedulingRelationPlacement.objects.get(value=task_scheduling_relation_definition["placement"]) time_offset = task_scheduling_relation_definition["time_offset"] first_task_draft = scheduling_unit_draft.task_drafts.get(name=task_scheduling_relation_definition["first"]) second_task_draft = scheduling_unit_draft.task_drafts.get(name=task_scheduling_relation_definition["second"]) - try: - with transaction.atomic(): - task_scheduling_relation = models.TaskSchedulingRelationDraft.objects.create(placement=placement, - time_offset=time_offset, - first=first_task_draft, - second=second_task_draft) - logger.info("created task_scheduling_relation id=%s between task draft id=%s name='%s' and id=%s name='%s", - task_scheduling_relation.pk, first_task_draft.id, first_task_draft.name, second_task_draft.id, second_task_draft.name) - except IntegrityError as e: - if 'TaskSchedulingRelationDraft_unique_relation' in str(e): - logger.info("task_scheduling_relation between task draft id=%s name='%s' and id=%s name='%s already exists", - first_task_draft.id, first_task_draft.name, second_task_draft.id, second_task_draft.name) - else: - raise + task_scheduling_relation, created = models.TaskSchedulingRelationDraft.objects.update_or_create(placement=placement, + time_offset=time_offset, + first=first_task_draft, + second=second_task_draft) + logger.info("%s task_scheduling_relation id=%s between task draft id=%s name='%s' and id=%s name='%s", + "created" if created else "updated", + task_scheduling_relation.pk, first_task_draft.id, first_task_draft.name, second_task_draft.id, second_task_draft.name) scheduling_unit_draft.refresh_from_db() return scheduling_unit_draft diff --git a/SAS/TMSS/backend/test/t_scheduling_units.py b/SAS/TMSS/backend/test/t_scheduling_units.py index 854e66c9b5c54febe1b9e3052bcf1feafc87c54f..7e28a9e310e705a1a08f5b7920249ecc0ef8aa05 100644 --- a/SAS/TMSS/backend/test/t_scheduling_units.py +++ b/SAS/TMSS/backend/test/t_scheduling_units.py @@ -338,11 +338,13 @@ class SchedulingUnitBlueprintIndirectModificationsTestCase(unittest.TestCase): """ Various tests for TMSS-850: SchedulingUnitBlueprints should be indirectly modifiable for certain cases, which are tested here. """ - def test_scheduling_unit_draft_task_graph_editing(self): + def test_modify_draft_graph(self): + '''core test to check if the task draft graph is correctly created/updated/removed''' with tmss_test_env.create_tmss_client() as client: # start with a default scheduling_unit_template specifications_doc with no tasks scheduling_unit_template = [template for template in client.get_path_as_json_object('scheduling_unit_template') if template['name']=='scheduling unit'][0] scheduling_constraints_template = [template for template in client.get_path_as_json_object('scheduling_constraints_template') if template['name']=='constraints'][0] + selection_template = [template for template in client.get_path_as_json_object('task_relation_selection_template') if template['name']=='all'][0] base_specifications_doc = client.get_schedulingunit_template_default_specification(scheduling_unit_template['name'], scheduling_unit_template['version']) if 'scheduling_constraints_template' in base_specifications_doc: base_specifications_doc['scheduling_constraints_doc'] = client.get_url_as_json_object(scheduling_constraints_template['url']+'/default') @@ -362,9 +364,10 @@ class SchedulingUnitBlueprintIndirectModificationsTestCase(unittest.TestCase): # import a specifications_doc with a single task single_task_specifications_doc = copy.deepcopy(base_specifications_doc) - single_task_specifications_doc['tasks'] = {'Task_A': {'specifications_doc': client.get_task_template_default_specification('ingest'), - 'specifications_template': 'ingest', - 'description': "my task A"}} + single_task_specifications_doc['tasks'] = {'Task_A': {'specifications_doc': client.get_task_template_default_specification('target observation'), + 'specifications_template': 'target observation', + 'description': "my task A"} } + scheduling_unit_draft = client.create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft['id'], specifications_doc=single_task_specifications_doc) @@ -373,10 +376,81 @@ class SchedulingUnitBlueprintIndirectModificationsTestCase(unittest.TestCase): # is the exported/generated specifications_doc equal to the imported specification? self.assertEqual(single_task_specifications_doc, client.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id)) - # # back to an empty graph with no tasks, and check results - # scheduling_unit_draft = client.create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft['id'], specifications_doc=base_specifications_doc) - # self.assertEqual(0, len(scheduling_unit_draft['task_drafts'])) - # self.assertEqual(base_specifications_doc, client.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id)) + # modify a setting in the specification, check the task is updated + org_duration = single_task_specifications_doc['tasks']['Task_A']['specifications_doc']['duration'] + new_duration = 2 * org_duration + single_task_specifications_doc['tasks']['Task_A']['specifications_doc']['duration'] = new_duration + + scheduling_unit_draft = client.create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft['id'], + specifications_doc=single_task_specifications_doc) + updated_exported_single_task_specifications_doc = client.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id) + self.assertEqual(single_task_specifications_doc, updated_exported_single_task_specifications_doc) + self.assertEqual(new_duration, updated_exported_single_task_specifications_doc['tasks']['Task_A']['specifications_doc']['duration']) + + + # back to an empty graph with no tasks, and check results + scheduling_unit_draft = client.create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft['id'], specifications_doc=base_specifications_doc) + self.assertEqual(0, len(scheduling_unit_draft['task_drafts'])) + self.assertEqual(base_specifications_doc, client.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id)) + + + # import a specifications_doc with a dual task and relation + dual_task_specifications_doc = copy.deepcopy(base_specifications_doc) + dual_task_specifications_doc['tasks'] = {'Task_A': {'specifications_doc': client.get_task_template_default_specification('target observation'), + 'specifications_template': 'target observation', + 'description': "my task A"}, + 'Task_B': {'specifications_doc': client.get_task_template_default_specification('ingest'), + 'specifications_template': 'ingest', + 'description': "my task B"} } + dual_task_specifications_doc['task_relations'] = [{ "input": { "role": "any", + "datatype": "visibilities", + "dataformat": "MeasurementSet" }, + "output": { "role": "correlator", + "datatype": "visibilities", + "dataformat": "MeasurementSet" }, + "producer": "Task_A", + "consumer": "Task_B", + "selection_doc": client.get_url_as_json_object(selection_template['url']+'/default'), + "selection_template": "all" }] + scheduling_unit_draft = client.create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft['id'], + specifications_doc=dual_task_specifications_doc) + self.assertEqual(2, len(scheduling_unit_draft['task_drafts'])) + self.assertEqual(dual_task_specifications_doc, client.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id)) + + # check updates on settings + updated_dual_task_specifications_doc = copy.deepcopy(dual_task_specifications_doc) + updated_dual_task_specifications_doc['tasks']['Task_A']['specifications_doc']['duration'] *= 2 + + scheduling_unit_draft = client.create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft['id'], + specifications_doc=updated_dual_task_specifications_doc) + self.assertEqual(2, len(scheduling_unit_draft['task_drafts'])) + self.assertEqual(updated_dual_task_specifications_doc, client.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id)) + + # check updates on task graph + updated_dual_task_specifications_doc['tasks']['Task_C'] = {'specifications_doc': client.get_task_template_default_specification('cleanup'), + 'specifications_template': 'cleanup', + 'description': "my task C"} + updated_dual_task_specifications_doc['task_relations'].append({ "input": { "role": "any", + "datatype": "visibilities", + "dataformat": "MeasurementSet" }, + "output": { "role": "correlator", + "datatype": "visibilities", + "dataformat": "MeasurementSet" }, + "producer": "Task_A", + "consumer": "Task_C", + "selection_doc": client.get_url_as_json_object(selection_template['url']+'/default'), + "selection_template": "all" }) + + scheduling_unit_draft = client.create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft['id'], + specifications_doc=updated_dual_task_specifications_doc) + self.assertEqual(3, len(scheduling_unit_draft['task_drafts'])) + self.assertEqual(updated_dual_task_specifications_doc, client.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id)) + + + # back to an empty graph with no tasks, and check results + scheduling_unit_draft = client.create_task_drafts_for_scheduling_unit_draft(scheduling_unit_draft['id'], specifications_doc=base_specifications_doc) + self.assertEqual(0, len(scheduling_unit_draft['task_drafts'])) + self.assertEqual(base_specifications_doc, client.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id)) if __name__ == "__main__":