diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints/__init__.py b/SAS/TMSS/backend/services/scheduling/lib/constraints/__init__.py index 4b46cc74036b44625b0df5d88c1d03d07c6c8449..835ada47d5752913579e2bbad9514981a993f764 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints/__init__.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints/__init__.py @@ -86,7 +86,7 @@ def filter_scheduling_units_using_constraints(scheduling_units: [models.Scheduli # For example, the user can choose a different template, # or submit a feature request to implement constraint solvers for this new template. logger.warning(e) - for subtask in models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).all(): + for subtask in models.Subtask.independent_subtasks().filter(task_blueprints__scheduling_unit_blueprint_id=scheduling_unit.id).all(): subtask.status = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.ERROR.value) subtask.save() @@ -151,7 +151,7 @@ def sort_scheduling_units_scored_by_constraints(scheduling_units: [models.Schedu # For example, the user can choose a different template, # or submit a feature request to implement constraint solvers for this new template. logger.warning(e) - for subtask in models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).all(): + for subtask in models.Subtask.independent_subtasks().filter(task_blueprints__scheduling_unit_blueprint_id=scheduling_unit.id).all(): subtask.status = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.ERROR.value) subtask.save() diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index d1c19a905a90b58cb65b363cc8bd095f5fab885e..c8ff0c4c2a7a0517243aa4ef9444db39161c9f51 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -296,7 +296,7 @@ def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str def get_dynamically_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]: '''get a list of all dynamically schedulable scheduling_units''' defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined') - defined_independend_subtask_ids = defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct().all() + defined_independend_subtask_ids = defined_independend_subtasks.values('task_blueprints__scheduling_unit_blueprint_id').distinct().all() scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtask_ids) \ .filter(draft__scheduling_constraints_template__isnull=False) \ .select_related('draft', 'draft__scheduling_constraints_template').all() @@ -310,7 +310,7 @@ def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> scheduled_subtasks = scheduled_subtasks.filter(stop_time__gte=lower) if upper is not None: scheduled_subtasks = scheduled_subtasks.filter(start_time__lte=upper) - return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all()) + return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprints__scheduling_unit_blueprint_id').distinct()).all()) def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool: diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py index bcd9f1fb6aa1d3dbbed8334c186dd3f53cb1e161..d41c6ca15518b114a9a6ff4472bbf8ad246a3881 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py @@ -236,11 +236,11 @@ class TestDynamicScheduling(TestCase): # Note: we use django.test.TestCase inst # check the scheduled subtask upcoming_scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled', - task_blueprint__scheduling_unit_blueprint__in=(scheduling_unit_blueprint_low, + task_blueprints__scheduling_unit_blueprint__in=(scheduling_unit_blueprint_low, scheduling_unit_blueprint_medium, scheduling_unit_blueprint_high)).all() self.assertEqual(1, upcoming_scheduled_subtasks.count()) - self.assertEqual(scheduling_unit_blueprint_high.id, upcoming_scheduled_subtasks[0].task_blueprint.scheduling_unit_blueprint.id) + self.assertEqual(scheduling_unit_blueprint_high.id, upcoming_scheduled_subtasks[0].task_blueprints().first().scheduling_unit_blueprint.id) # all task blueprints share same SU, so it does not matter which one we check # check scheduling_unit_blueprint_low starts after the scheduled scheduling_unit_blueprint_high self.assertGreater(scheduling_unit_blueprint_low.start_time, scheduling_unit_blueprint_medium.start_time) diff --git a/SAS/TMSS/backend/services/scheduling/test/t_subtask_scheduling_service.py b/SAS/TMSS/backend/services/scheduling/test/t_subtask_scheduling_service.py index 57d3ca6f86bbc6ab3b9e5d5a7de7c051e75e2650..a8b86bb1cd4b7249a8aa89ef9c4ab96c5a386452 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_subtask_scheduling_service.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_subtask_scheduling_service.py @@ -106,7 +106,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): # create two subtasks subtask1 = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(state="defined", specifications_template_url=subtask_template_url), '/subtask/') - subtask2 = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(state="defined", specifications_template_url=subtask_template_url, task_blueprint_url=subtask1['task_blueprint']), '/subtask/') + subtask2 = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(state="defined", specifications_template_url=subtask_template_url, task_blueprint_urls=subtask1['task_blueprints']), '/subtask/') # connect them output_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.SubtaskOutput(subtask1['url']), '/subtask_output/') diff --git a/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py b/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py index 93e5c5a7f113a7f39b6838aeff20d489137ef96b..606f173725084631845ca5ffc3029696f3203f15 100644 --- a/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py +++ b/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py @@ -184,14 +184,15 @@ class TMSSPGListener(PostgresListener): # ... and also send status change and object update events for the parent task, and schedulingunit, # because their status is implicitly derived from their subtask(s) # send both object.updated and status change events - self.onTaskBlueprintUpdated( {'id': subtask.task_blueprint.id}) - self._sendNotification(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.'+subtask.task_blueprint.status.capitalize(), - {'id': subtask.task_blueprint.id, 'status': subtask.task_blueprint.status}) - - self.onSchedulingUnitBlueprintUpdated( {'id': subtask.task_blueprint.scheduling_unit_blueprint.id}) - self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.'+subtask.task_blueprint.scheduling_unit_blueprint.status.capitalize(), - {'id': subtask.task_blueprint.scheduling_unit_blueprint.id, 'status': subtask.task_blueprint.scheduling_unit_blueprint.status}) - + for td in subtask.task_blueprints.all(): + self.onTaskBlueprintUpdated( {'id': td.id}) + self._sendNotification(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.'+td.status.capitalize(), + {'id': td.id, 'status': td.status}) + + self.onSchedulingUnitBlueprintUpdated( {'id': td.scheduling_unit_blueprint.id}) + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.'+td.scheduling_unit_blueprint.status.capitalize(), + {'id': td.scheduling_unit_blueprint.id, 'status': td.scheduling_unit_blueprint.status}) + def onTaskBlueprintInserted(self, payload = None): self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', payload) diff --git a/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py b/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py index 37fbe82b303bd9f2a3e8246c7f98daf29273e33d..5037782678f81bc525984af256babd3b35cb24df 100755 --- a/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py +++ b/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py @@ -135,7 +135,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): # create a SubTask - subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/') + subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(task_blueprint_urls=[task_blueprint['url']]), '/subtask/') # sync and check with service.lock: diff --git a/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py b/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py index f3f8388cb9b361665964ba3660f926b2653bbfc0..37eeddf21e5fabb2b3008e3a3efb272674ddbfa3 100755 --- a/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py +++ b/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py @@ -152,7 +152,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): # Test subtask create subtask = self.test_data_creator.post_data_and_get_response_as_json_object( - self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/') + self.test_data_creator.Subtask(task_blueprint_urls=[task_blueprint['url']]), '/subtask/') test_object(subtask, self.ObjTypes.SUBTASK, self.ObjActions.CREATE) # Test updates diff --git a/SAS/TMSS/backend/src/migrate_momdb_to_tmss.py b/SAS/TMSS/backend/src/migrate_momdb_to_tmss.py index 4d38aff956611eb3b0f0d5044fbffa32994c9742..a77af99efa8693c76fcaee0f43537d65bdea0848 100755 --- a/SAS/TMSS/backend/src/migrate_momdb_to_tmss.py +++ b/SAS/TMSS/backend/src/migrate_momdb_to_tmss.py @@ -506,7 +506,7 @@ def create_subtask_trees_for_project_in_momdb(project_mom2id, project): details = {"id": mom_details['mom2id'], "state": state, "specifications_doc": {}, # todo: where from? We have user_specification_id (for task?) and system_specification_id (for subtask?) on lofar_observation (I guess referring to lofar_observation_specification). Shall we piece things together from that, or is there a text blob to use? Also: pipeline info lives in obs_spec too? - "task_blueprint": task_blueprint, + #"task_blueprint": task_blueprint, # ManyToMany, use set() "specifications_template": specifications_template, "tags": ["migrated_from_MoM", "migration_incomplete"], # todo: set complete once it is verified that all info is present "priority": project.priority_rank, # todo: correct to derive from project? @@ -523,11 +523,13 @@ def create_subtask_trees_for_project_in_momdb(project_mom2id, project): if subtask_qs.count(): # todo: this will update the subtask, but other TMSS objects do not share id with MoM and get recreated with every migration run. Can we clean this up somehow? subtask_qs.update(**details) + subtask_qs.task_blueprints.set([task_blueprint]) subtask = subtask_qs.first() logger.info("...updated existing subtask tmss id=%s" % subtask.id) stats['subtasks_updated'] += 1 else: subtask = models.Subtask.objects.create(**details) + subtask.task_blueprints.set([task_blueprint]) logger.info("...created new subtask tmss id=%s" % subtask.id) stats['subtasks_created'] += 1 diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index 313aaf8090155c185fcc8ee7b62243dd52c8f74b..ac448421eb78ec6e309903a4e5c45341cf2ba003 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -309,7 +309,10 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas parset["Observation.tmssID"] = subtask.pk parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize() parset["Observation.processSubtype"] = "Beam Observation" - parset["Observation.Campaign.name"] = subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) + if len(project_set) != 1: + raise ConversionException('Subtask id=%s cannot be converted to parset because it references task blueprint that belong to different projects=%s' % (subtask.id, project_set)) + parset["Observation.Campaign.name"] = list(project_set)[0] parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time parset["Observation.strategy"] = "default" # maybe not mandatory? @@ -429,8 +432,11 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.processSubtype"] = "Averaging Pipeline" parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "preprocessing_pipeline.py" parset["Observation.ObservationControl.PythonControl.softwareVersion"] = "" - parset["Observation.Campaign.name"] = subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name - parset["Observation.Scheduler.taskName"] = subtask.task_blueprint.name + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) + if len(project_set) != 1: + raise ConversionException('Subtask pk=%s cannot be converted to parset because it references task blueprint that belong to different projects (names=%s)' % (subtask.pk, project_set)) + parset["Observation.Campaign.name"] = list(project_set)[0] + parset["Observation.Scheduler.taskName"] = subtask.task_blueprints.first().name # Scheduler keys are artefacts of an older time. Their content is deprecated, so we don't care whch task we take this from parset["Observation.Scheduler.predecessors"] = [] parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = 'cpu' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py index a48351c71b22a4b5a8335b81dbcba56eb7bb144a..570e8b9c7663a7c5878bdaf89f154235cf04e1a9 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py @@ -144,7 +144,8 @@ def create_sip_representation_for_subtask(subtask: Subtask): # determine subtask specific properties and add subtask representation to Sip object if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: - subarraypointings=None # todo, subtask.specifications_doc, probably more complex than it looks -> RGOE yes complex type for later -> JK: assuming this is done in TMSS-308? + subarraypointings = None # todo, subtask.specifications_doc, probably more complex than it looks -> RGOE yes complex type for later -> JK: assuming this is done in TMSS-308? + concatenated_task_descriptions = "\n".join([tb.description for tb in subtask.task_blueprints.order_by("specifications_template__name").all()]) # we could also order by "specifications_template__type__value"? observation = siplib.Observation(observingmode=constants.OBSERVINGMODETYPE_BEAM_OBSERVATION, # can be hardcoded for an observation instrumentfilter=mapping_filterset_type_TMSS_2_SIP[subtask.specifications_doc['stations']['filter']], clock_frequency="200", # fixed, @@ -162,7 +163,7 @@ def create_sip_representation_for_subtask(subtask: Subtask): process_map=process_map, channelwidth_frequency=None, # NA any more ('BlueGene compatibility' see comment in LTA-SIP.xsd) channelwidth_frequencyunit=constants.FREQUENCYUNIT_HZ, # fixed - observationdescription=subtask.task_blueprint.description, + observationdescription=concatenated_task_descriptions, channelspersubband=0, # NA any more ('BlueGene compatibility' see comment in LTA-SIP.xsd) subarraypointings=subarraypointings, transientbufferboardevents=None # fixed @@ -175,9 +176,11 @@ def create_sip_representation_for_subtask(subtask: Subtask): sourcedata_identifiers += [get_siplib_identifier(dp.global_identifier, "Dataproduct id=%s" % dp.id) for dp in input.dataproducts.all()] # todo: use correct id, lookup based on TMSS reference or so, tbd if not sourcedata_identifiers: raise TMSSException("There seems to be no subtask input associated to your pipeline subtask id %s. Please define what data the pipeline processed." % subtask.id) + if subtask.task_blueprints.count() > 1: + raise TMSSException("There are several task blueprints pk=%s associated to subtask pk=%s, but for pipelines, only a single task is supported." % ([tb.pk for tb in subtask.task_blueprints.all()], subtask.pk)) pipeline_map = siplib.PipelineMap( - name=subtask.task_blueprint.name, + name=subtask.task_blueprints.first().name, # there is only one version='unknown', # todo from subtask.specifications_doc? from feedback (we have feedback and storagewriter versions there, not pipeline version or sth)? sourcedata_identifiers=sourcedata_identifiers, process_map=process_map) @@ -280,7 +283,7 @@ def create_sip_representation_for_dataproduct(dataproduct: Dataproduct): logger.warning("Could not determine the type of dataproduct id %s (%s). Falling back to %s" % (dataproduct.id, err, dataproduct_type)) try: - dataproduct_fileformat = fileformat_map[dataproduct.producer.subtask.task_blueprint.consumed_by.first().dataformat.value] # todo same as with type? Why is this not with the data? Why is this so different from the LTA datamodel? + dataproduct_fileformat = fileformat_map[dataproduct.dataformat.value] # todo same as with type? Why is this not with the data? Why is this so different from the LTA datamodel? except Exception as err: dataproduct_fileformat = constants.FILEFORMATTYPE_UNDOCUMENTED logger.warning("Could not determine the type of dataproduct id %s (%s). Falling back to %s" % (dataproduct.id, err, dataproduct_fileformat)) @@ -465,7 +468,11 @@ def generate_sip_for_dataproduct(dataproduct: Dataproduct) -> siplib.Sip: sip_dataproduct = create_sip_representation_for_dataproduct(dataproduct) # Gather project details - project = dataproduct.producer.subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in dataproduct.producer.subtask.task_blueprints.all()]) + if len(project_set) != 1: + # todo: support for multiple projects needs to be picked up in TMSS-689 + raise TMSSException('Dataproduct pk=%s references task blueprints that belong to different projects (names=%s). This can currently not be represented in SIP format.' % (dataproduct.pk, project_set)) + project = dataproduct.producer.subtask.task_blueprints.first().scheduling_unit_blueprint.draft.scheduling_set.project # there must be only one task blueprint project_code = project.name project_primaryinvestigator = 'project_primaryinvestigator' project_contactauthor = 'project_contactauthor' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py index b839825872027682578453db8cbf18389a60adcb..569127f8d49da529f086ddbeafbe1d8bfe54943e 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py @@ -1108,6 +1108,11 @@ class Migration(migrations.Migration): name='subtask', field=models.ForeignKey(help_text='Subtask to which this output specification refers.', on_delete=django.db.models.deletion.CASCADE, related_name='outputs', to='tmssapp.Subtask'), ), + migrations.AddField( + model_name='subtaskoutput', + name='task_blueprint', + field=models.ForeignKey(help_text='Task to which this output specification refers.', on_delete=django.db.models.deletion.CASCADE, related_name='outputs', to='tmssapp.TaskBlueprint'), + ), migrations.AddField( model_name='subtaskinput', name='dataproducts', @@ -1160,8 +1165,8 @@ class Migration(migrations.Migration): ), migrations.AddField( model_name='subtask', - name='task_blueprint', - field=models.ForeignKey(help_text='Task Blueprint to which this Subtask belongs.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='subtasks', to='tmssapp.TaskBlueprint'), + name='task_blueprints', + field=models.ManyToManyField(blank=True, help_text='Task Blueprint to which this Subtask belongs.', related_name='subtasks', to='tmssapp.TaskBlueprint'), ), migrations.AddConstraint( model_name='stationtimeline', diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index 234f8ab359cd7dad4c3bb3abcad5d0e4dabc4cd8..9e9a1cbf634f75d353d148c6efc0ed924ed3e619 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -147,7 +147,7 @@ class Subtask(BasicCommon): stop_time = DateTimeField(null=True, help_text='Stop this subtask at the specified time (NULLable).') state = ForeignKey('SubtaskState', null=False, on_delete=PROTECT, related_name='task_states', help_text='Subtask state (see Subtask State Machine).') specifications_doc = JSONField(help_text='Final specifications, as input for the controller.') - task_blueprint = ForeignKey('TaskBlueprint', related_name='subtasks', null=True, on_delete=SET_NULL, help_text='Task Blueprint to which this Subtask belongs.') + task_blueprints = ManyToManyField('TaskBlueprint', related_name='subtasks', blank=True, help_text='Task Blueprint to which this Subtask belongs.') specifications_template = ForeignKey('SubtaskTemplate', null=False, on_delete=PROTECT, help_text='Schema used for specifications_doc.') do_cancel = DateTimeField(null=True, help_text='Timestamp when the subtask has been ordered to cancel (NULLable).') cluster = ForeignKey('Cluster', null=True, on_delete=PROTECT, help_text='Where the Subtask is scheduled to run (NULLable).') @@ -174,11 +174,13 @@ class Subtask(BasicCommon): '''get the specified (or estimated) duration of this subtask based on the specified task duration and the subtask type''' if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: # observations have a specified duration, so grab it from the spec. - return timedelta(seconds=self.task_blueprint.specifications_doc.get('duration', 0)) + # In case we have several associated tasks: use the longest duration, since we assume that tasks will run in parallel (there would be no reason to combine them into a subtask). + return timedelta(seconds=max([tb.specifications_doc.get('duration', 0) for tb in self.task_blueprints.all()])) if self.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: # pipelines usually do not have a specified duration, so make a guess (half the obs duration?). - return timedelta(seconds=self.task_blueprint.specifications_doc.get('duration', max(p.specified_duration.total_seconds() for p in self.predecessors)/2)) + # In case we have several associated tasks: this guess is probably in no way accurate anyway, so we assume it does not really matter which task blueprint we refer to here + return timedelta(seconds=self.task_blueprints.first().specifications_doc.get('duration', max(p.specified_duration.total_seconds() for p in self.predecessors)/2)) # other subtasktypes usually depend on cpu/data/network etc. So, make a guess (for now) return timedelta(minutes=5) @@ -340,6 +342,7 @@ class SubtaskInput(BasicCommon): class SubtaskOutput(BasicCommon): subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='outputs', help_text='Subtask to which this output specification refers.') + task_blueprint = ForeignKey('TaskBlueprint', null=False, on_delete=CASCADE, related_name='outputs', help_text='Task to which this output specification refers.') class SAP(BasicCommon): diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 5c1513c829161770f6a6a8101976cbb03d0f5537..4bb7ee107199e1023d9c6e00df696cfccaf71d85 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -163,7 +163,7 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta # So... copy the calibrator specs first, then loop over the shared target/calibrator specs... if 'calibrator' in task_blueprint.specifications_template.name.lower(): # Calibrator requires related Target Task Observation for some specifications - target_task_blueprint = get_related_target_observation_task_blueprint(task_blueprint) + target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint) if target_task_blueprint is None: raise SubtaskCreationException("Cannot create calibrator observation subtask specifications from task_blueprint id=%s with template name='%s' because no related target observation task_blueprint is found" % (task_blueprint.id, task_blueprint.specifications_template.name)) target_task_spec = target_task_blueprint.specifications_doc @@ -270,7 +270,7 @@ def create_observation_subtask_specifications_from_observation_task_blueprint(ta "coherent": True, # determine absolute tab pointing for subtask by adding relative tab pointing from task to target sap pointing "pointing": tab["pointing"] if not tab.get("relative", False) else _add_pointings(tab['pointing'], target_sap['digital_pointing']) - }) + }) if "tab_rings" in sap: ring_pointings = _generate_tab_ring_pointings(target_sap["digital_pointing"], sap.pop("tab_rings")) @@ -384,9 +384,21 @@ def get_stations_in_group(station_group_name: str) -> []: return sorted(list(station_names)) -def get_related_target_observation_task_blueprint(calibrator_or_beamformer_task_blueprint: TaskBlueprint) -> TaskBlueprint: +def get_related_calibrator_observation_task_blueprint(target_task_blueprint: TaskBlueprint) -> (TaskBlueprint, SchedulingRelationPlacement): """ - get the related target observation task_blueprint for the given calibrator or beamformer task_blueprint + get the related calibrator observation task_blueprint and the relative placement for the given target task_blueprint + if nothing found return None + """ + if 'target' not in target_task_blueprint.specifications_template.name.lower(): + raise ValueError("Cannot get a related calibrator observation task_blueprint for non-target task_blueprint id=%s template_name='%s'", + target_task_blueprint.id, target_task_blueprint.specifications_template.name) + + return _get_related_observation_task_blueprint(target_task_blueprint, 'calibrator observation') + + +def get_related_target_observation_task_blueprint(calibrator_or_beamformer_task_blueprint: TaskBlueprint) -> (TaskBlueprint, SchedulingRelationPlacement): + """ + get the related target observation task_blueprint and the relative placement for the given calibrator or beamformer task_blueprint if nothing found return None """ if 'calibrator' not in calibrator_or_beamformer_task_blueprint.specifications_template.name.lower() and \ @@ -394,17 +406,21 @@ def get_related_target_observation_task_blueprint(calibrator_or_beamformer_task_ raise ValueError("Cannot get a related target observation task_blueprint for non-calibrator/beamformer task_blueprint id=%s template_name='%s'", calibrator_or_beamformer_task_blueprint.id, calibrator_or_beamformer_task_blueprint.specifications_template.name) + return _get_related_observation_task_blueprint(calibrator_or_beamformer_task_blueprint, 'target observation') + + +def _get_related_observation_task_blueprint(task_blueprint: TaskBlueprint, related_template_name: str) -> (TaskBlueprint, SchedulingRelationPlacement): try: - return next(relation.second for relation in TaskSchedulingRelationBlueprint.objects.filter(first=calibrator_or_beamformer_task_blueprint).all() - if relation.second is not None and relation.second.specifications_template.name.lower() == 'target observation') + return next((relation.second, relation.placement) for relation in TaskSchedulingRelationBlueprint.objects.filter(first=task_blueprint).all() + if relation.second is not None and relation.second.specifications_template.name.lower() == related_template_name) except StopIteration: try: - return next(relation.first for relation in TaskSchedulingRelationBlueprint.objects.filter(second=calibrator_or_beamformer_task_blueprint).all() - if relation.first is not None and relation.first.specifications_template.name.lower() == 'target observation') + return next((relation.first, relation.placement) for relation in TaskSchedulingRelationBlueprint.objects.filter(second=task_blueprint).all() + if relation.first is not None and relation.first.specifications_template.name.lower() == related_template_name) except StopIteration: - logger.info("No related target observation task_blueprint found for calibrator/beamformer observation task_blueprint id=%d", calibrator_or_beamformer_task_blueprint.id) + logger.info("No related %s task_blueprint found for task_blueprint id=%d", related_template_name, task_blueprint.id) - return None + return None, None def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: @@ -423,19 +439,45 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), "specifications_doc": specifications_doc, - "task_blueprint": task_blueprint, + #"task_blueprint": task_blueprint, # ManyToMany, so use set()! "specifications_template": subtask_template, "tags": [], "cluster": Cluster.objects.get(name=cluster_name) } - subtask = Subtask.objects.create(**subtask_data) + + # If we deal with a calibrator obs that runs in parallel to a target observation, add the calibrator beam to the + # existing target obs subtask. + subtask = None + if 'calibrator' in task_blueprint.specifications_template.name.lower(): + related_task_blueprint, relation = get_related_target_observation_task_blueprint(task_blueprint) + if relation and relation.value == 'parallel': + # add calibrator beam + subtask = related_task_blueprint.subtasks.filter(specifications_template__type__value=SubtaskType.Choices.OBSERVATION.value).first() + if not subtask: + raise SubtaskCreationException('Calibrator observation cannot be added to the target subtask, because it does not exist. Make sure to create a subtask from the target observation task id=%s first.' % related_task_blueprint.id) + subtask.specifications_doc['stations']['digital_pointings'] += subtask_data['specifications_doc']['stations']['digital_pointings'] + # check that the additional beam fits into the spec (observation must not result in >488 subbands across all beams) + total_subbands = sum([len(digital_pointing['subbands']) for digital_pointing in subtask.specifications_doc['stations']['digital_pointings']]) + if total_subbands > 488: # todo: should this be better handled in JSON? + raise SubtaskCreationException('Calibrator beam does not fit into the spec (results in %s total subbands, but only 488 are possible)' % total_subbands) + + if not subtask: + subtask = Subtask.objects.create(**subtask_data) + subtask.task_blueprints.set(list(subtask.task_blueprints.all()) + [task_blueprint]) # step 2: create and link subtask input/output # an observation has no input, it just produces output data - subtask_output = SubtaskOutput.objects.create(subtask=subtask) - - # step 3: set state to DEFINED - subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + subtask_output = SubtaskOutput.objects.create(subtask=subtask, + task_blueprint=task_blueprint) + + # step 3: set state to DEFINED, unless we have a target obs with a related parallel calibrator obs + defined = True + if 'target' in task_blueprint.specifications_template.name.lower(): + _, relation = get_related_calibrator_observation_task_blueprint(task_blueprint) + if relation and relation.value == 'parallel': + defined = False + if defined: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) subtask.save() return subtask @@ -456,7 +498,8 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) https://support.astron.nl/confluence/display/TMSS/Specification+Flow ''' # step 0: check pre-requisites - check_prerequities_for_subtask_creation(observation_subtask.task_blueprint) + for tb in observation_subtask.task_blueprints.all(): + check_prerequities_for_subtask_creation(tb) if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % ( @@ -484,20 +527,26 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) qafile_subtask_data = { "start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), - "task_blueprint": observation_subtask.task_blueprint, + #"task_blueprint": observation_subtask.task_blueprint, # ManyToMany, use set() "specifications_template": qafile_subtask_template, "specifications_doc": qafile_subtask_spec, "cluster": observation_subtask.cluster} qafile_subtask = Subtask.objects.create(**qafile_subtask_data) + qafile_subtask.task_blueprints.set(observation_subtask.task_blueprints.all()) # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") selection_doc = get_default_json_object_for_schema(selection_template.schema) - qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask, - producer=observation_subtask.outputs.first(), # TODO: determine proper producer based on spec in task_relation_blueprint - selection_doc=selection_doc, - selection_template=selection_template) - qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask) + + for obs_out in observation_subtask.outputs.all(): + qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask, + producer=obs_out, # TODO: determine proper producer based on spec in task_relation_blueprint + selection_doc=selection_doc, + selection_template=selection_template) + + for tb in observation_subtask.task_blueprints.all(): + qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, + task_blueprint=tb) # step 3: set state to DEFINED qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -510,7 +559,7 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) def create_qaplots_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: if 'calibrator' in task_blueprint.specifications_template.name.lower(): # Calibrator requires related Target Task Observation for some specifications - target_task_blueprint = get_related_target_observation_task_blueprint(task_blueprint) + target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint) if target_task_blueprint is None: raise SubtaskCreationException("Cannot retrieve specifications for task id=%d because no related target observation is found " % task.pk) else: @@ -534,7 +583,8 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta https://support.astron.nl/confluence/display/TMSS/Specification+Flow ''' # step 0: check pre-requisites - check_prerequities_for_subtask_creation(qafile_subtask.task_blueprint) + for tb in qafile_subtask.task_blueprints.all(): + check_prerequities_for_subtask_creation(tb) if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value: raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % ( @@ -558,11 +608,12 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta qaplots_subtask_data = { "start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), - "task_blueprint": qafile_subtask.task_blueprint, + #"task_blueprint": qafile_subtask.task_blueprint, "specifications_template": qaplots_subtask_template, "specifications_doc": qaplots_subtask_spec_doc, "cluster": qafile_subtask.cluster} qaplots_subtask = Subtask.objects.create(**qaplots_subtask_data) + qaplots_subtask.task_blueprints.set(qafile_subtask.task_blueprints.all()) # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") @@ -571,7 +622,10 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta producer=qafile_subtask.outputs.first(), selection_doc=selection_doc, selection_template=selection_template) - qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask) + + for tb in qafile_subtask.task_blueprints.all(): + qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask, + task_blueprint=tb) # step 3: set state to DEFINED qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -604,11 +658,12 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri subtask_data = { "start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), - "task_blueprint": task_blueprint, + #"task_blueprint": task_blueprint, # ManyToMany, so use set()! "specifications_template": subtask_template, "specifications_doc": subtask_specs, "cluster": Cluster.objects.get(name=cluster_name) } subtask = Subtask.objects.create(**subtask_data) + subtask.task_blueprints.set([task_blueprint]) # step 2: create and link subtask input/output for task_relation_blueprint in task_blueprint.produced_by.all(): @@ -622,7 +677,8 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri producer=predecessor_subtask_output, selection_doc=task_relation_blueprint.selection_doc, selection_template=task_relation_blueprint.selection_template) - subtask_output = SubtaskOutput.objects.create(subtask=subtask) + subtask_output = SubtaskOutput.objects.create(subtask=subtask, + task_blueprint=task_blueprint) # step 3: set state to DEFINED subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -648,11 +704,12 @@ def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> subtask_data = {"start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), - "task_blueprint": task_blueprint, + #"task_blueprint": task_blueprint, # ManyToMany, so use set()! "specifications_template": subtask_template, "specifications_doc": subtask_specs, "cluster": Cluster.objects.get(name=cluster_name)} subtask = Subtask.objects.create(**subtask_data) + subtask.task_blueprints.set([task_blueprint]) # step 2: create and link subtask input for task_relation_blueprint in task_blueprint.produced_by.all(): @@ -771,7 +828,7 @@ def update_subtasks_start_times_for_scheduling_unit(scheduling_unit: SchedulingU for task_blueprint in scheduling_unit.task_blueprints.all(): defined_independend_subtasks = task_blueprint.subtasks.filter(state__value='defined').filter(inputs=None).all() for subtask in defined_independend_subtasks: - update_start_time_and_shift_successors_until_after_stop_time(subtask, start_time + subtask.task_blueprint.relative_start_time) + update_start_time_and_shift_successors_until_after_stop_time(subtask, start_time + min([tb.relative_start_time for tb in subtask.task_blueprints.all()])) # todo: min is correct here? def update_start_time_and_shift_successors_until_after_stop_time(subtask: Subtask, start_time: datetime): @@ -789,13 +846,22 @@ def shift_successors_until_after_stop_time(subtask: Subtask): # ... but adjust it if there is a scheduling_relation with an offset. # so, check if these successive subtasks have different task_blueprint parents - if subtask.task_blueprint.id != successor.task_blueprint.id: - relations = (TaskSchedulingRelationBlueprint.objects.filter(first=subtask.task_blueprint, second=successor.task_blueprint) | - TaskSchedulingRelationBlueprint.objects.filter(first=successor.task_blueprint, second=subtask.task_blueprint)).all() - if relations: - # there should be only one scheduling relation between the tasks - relation = relations[0] - successor_start_time += timedelta(seconds=relation.time_offset) + # Note: subtasks either have the same parent task(s) or different ones, no partial overlap. + # we now need to look up all combinations between subtask and successor blueprints + # to find if theres a relation with a time offset between the tasks... + time_offsets = [] + for tb in subtask.task_blueprints.all(): + for successor_tb in successor.task_blueprints.all(): + if tb.id != successor_tb.id: + relations = (TaskSchedulingRelationBlueprint.objects.filter(first=tb, second=successor_tb) | + TaskSchedulingRelationBlueprint.objects.filter(first=successor_tb, second=tb)).all() + + if relations: + # there should be only one scheduling relation between the tasks + time_offsets += [relations[0].time_offset] + + if len(time_offsets) > 0: + successor_start_time += timedelta(seconds=max(time_offsets)) # update the starttime and recurse to shift the successor successors as well update_start_time_and_shift_successors_until_after_stop_time(successor, successor_start_time) @@ -953,17 +1019,18 @@ def get_station_groups(subtask): :return: station_groups which is a list of dict. { station_list, max_nr_missing } """ station_groups = [] - if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower(): - # Calibrator requires related Target Task Observation for some specifications - target_task_blueprint = get_related_target_observation_task_blueprint(subtask.task_blueprint) - if target_task_blueprint is None: - raise SubtaskException("Cannot retrieve related target observation of task_blueprint %d (subtask %d)" % - (subtask.task_blueprint.id, subtask.id)) - if "station_groups" in target_task_blueprint.specifications_doc.keys(): - station_groups = target_task_blueprint.specifications_doc["station_groups"] - else: - if "station_groups" in subtask.task_blueprint.specifications_doc.keys(): - station_groups = subtask.task_blueprint.specifications_doc["station_groups"] + for task_blueprint in subtask.task_blueprints.all(): + if 'calibrator' in task_blueprint.specifications_template.name.lower(): + # Calibrator requires related Target Task Observation for some specifications + target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint) + if target_task_blueprint is None: + raise SubtaskException("Cannot retrieve related target observation of task_blueprint %d (subtask %d)" % + (task_blueprint.id, subtask.id)) + if "station_groups" in target_task_blueprint.specifications_doc.keys(): + station_groups = target_task_blueprint.specifications_doc["station_groups"] + else: + if "station_groups" in task_blueprint.specifications_doc.keys(): + station_groups = task_blueprint.specifications_doc["station_groups"] return station_groups @@ -1152,7 +1219,24 @@ def schedule_observation_subtask(observation_subtask: Subtask): dataproducts = [] specifications_doc = observation_subtask.specifications_doc dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") - subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first() + + # select correct output for each pointing based on name + subtask_output_dict = {} + + for task_blueprint in observation_subtask.task_blueprints.all(): + output = observation_subtask.outputs.filter(task_blueprint=task_blueprint).first() + if not output: + raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it is missing the output for ' + 'task_blueprint id=%s (subtask has associated task_blueprints=%s, but ' + 'has outputs for task_blueprint=%s' % (observation_subtask.id, + task_blueprint.id, + [(tb.id, tb.specifications_template.type) for tb in observation_subtask.task_blueprints.all()], + [(out.task_blueprint.id, out.task_blueprint.specifications_template.type) for out in observation_subtask.outputs.all()])) + if 'SAPs' in task_blueprint.specifications_doc: # target + for sap in task_blueprint.specifications_doc['SAPs']: + subtask_output_dict[sap['name']] = output + if 'pointing' in task_blueprint.specifications_doc: # calibrator + subtask_output_dict[task_blueprint.specifications_doc['name']] = output # create SAP objects, as observations create new beams antennaset = specifications_doc['stations']['antenna_set'] @@ -1173,9 +1257,13 @@ def schedule_observation_subtask(observation_subtask: Subtask): specifications_template=SAPTemplate.objects.get(name="SAP")) for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings'])] # store everything below this directory + # Support for several projects will be added in TMSS-689, for now catch it. + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in observation_subtask.task_blueprints.all()]) + if len(project_set) != 1: + raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it references task blueprints that belong to different projects=%s' % (observation_subtask.id, project_set)) directory = "/data/%s/%s/L%s" % ("projects" if isProductionEnvironment() else "test-projects", - observation_subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name, - observation_subtask.id) + list(project_set)[0], # TMSS-689: use correct project name for each dataproduct + observation_subtask.id) # create correlated dataproducts if specifications_doc['COBALT']['correlator']['enabled']: @@ -1183,6 +1271,10 @@ def schedule_observation_subtask(observation_subtask: Subtask): sb_nr_offset = 0 # subband numbers run from 0 to (nr_subbands-1), increasing across SAPs for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']): + if pointing['name'] in subtask_output_dict: + subtask_output = subtask_output_dict[pointing['name']] + else: + raise SubtaskSchedulingException('Cannot schedule subtask id=%s because the output for pointing name=%s cannot be determined.' % (observation_subtask.id, pointing['name'])) for sb_nr, subband in enumerate(pointing['subbands'], start=sb_nr_offset): dataproducts.append(Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr), directory=directory+"/uv", @@ -1224,7 +1316,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): directory=directory+("/cs" if coherent else "/is"), dataformat=Dataformat.objects.get(value="Beamformed"), datatype=Datatype.objects.get(value="time series"), - producer=subtask_output, + producer=observation_subtask.outputs.first(), # todo: select correct output. I tried "subtask_output_dict[sap['name']]" but tests fail because the sap's name is not in the task blueprint. Maybe it's just test setup and this should work? specifications_doc={"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"], "coherent": coherent, "identifiers": {"pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr}}, specifications_template=dataproduct_specifications_template_timeseries, feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), @@ -1386,7 +1478,7 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): ingest_subtask.save() # check permission pre-requisites - scheduling_unit_blueprint = ingest_subtask.task_blueprint.scheduling_unit_blueprint + scheduling_unit_blueprint = ingest_subtask.task_blueprints.first().scheduling_unit_blueprint # first() is fine because we assume an ingest subtask does not serve tasks across SU boundaries if scheduling_unit_blueprint.ingest_permission_required: if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow(): raise SubtaskSchedulingException("Cannot schedule ingest subtask id=%d because it requires explicit permission and the permission has not been granted (yet)" % (ingest_subtask.pk,)) @@ -1416,7 +1508,9 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): ingest_subtask_input.dataproducts.set(input_dataproducts) # define output and create output dataproducts. - ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask) + tb = ingest_subtask_input.producer.task_blueprint # output dataproducts are linked to the same task as its input dataproduct + ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask, + task_blueprint=tb) # prepare identifiers in bulk for each output_dataproduct dp_gids = [SIPidentifier(source="TMSS") for _ in input_dataproducts] @@ -1514,7 +1608,7 @@ def create_and_schedule_subtasks_from_task_blueprint(task_blueprint: TaskBluepri def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint, start_time: datetime=None) -> [Subtask]: '''Convenience method: Schedule (and return) the subtasks in the task_blueprint that are not dependend on any predecessors''' - independent_subtasks = list(Subtask.independent_subtasks().filter(task_blueprint_id=task_blueprint.id, state__value=SubtaskState.Choices.DEFINED.value).all()) + independent_subtasks = list(Subtask.independent_subtasks().filter(task_blueprints__id=task_blueprint.id, state__value=SubtaskState.Choices.DEFINED.value).all()) for subtask in independent_subtasks: if start_time is not None: @@ -1606,14 +1700,15 @@ def get_observation_task_specification_with_check_for_calibrator(subtask): :param: subtask object :return: task_spec: the specifications_doc of the blue print task which is allways a target observation """ - if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower(): - # Calibrator requires related Target Task Observation for some specifications - target_task_blueprint = get_related_target_observation_task_blueprint(subtask.task_blueprint) - if target_task_blueprint is None: - raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk) - task_spec = target_task_blueprint.specifications_doc - logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s", - subtask.task_blueprint.id, target_task_blueprint.id) - else: - task_spec = subtask.task_blueprint.specifications_doc - return task_spec + for task_blueprint in subtask.task_blueprints.all(): + if 'calibrator' in task_blueprint.specifications_template.name.lower(): + # Calibrator requires related Target Task Observation for some specifications + target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint) + if target_task_blueprint is None: + raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk) + task_spec = target_task_blueprint.specifications_doc + logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s", + task_blueprint.id, target_task_blueprint.id) + else: + task_spec = task_blueprint.specifications_doc + return task_spec diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py index 24be767153203f7c83edaaa2b936ad782317b9c0..aefffee28590dba02326a1ab116dbc38324faedf 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py @@ -788,7 +788,7 @@ class SchedulingUnitBlueprintViewSet(LOFARViewSet): operation_description="Get the subtask logging urls of this schedulingunit blueprint.") @action(methods=['get'], detail=True, url_name='get_all_subtasks_log_urls') def get_all_subtasks_log_urls(self, request, pk=None): - subtasks = models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint_id=pk) + subtasks = models.Subtask.objects.filter(task_blueprints__scheduling_unit_blueprint_id=pk) result = [] for subtask in subtasks: if subtask.log_url != "": diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py index 4616f51c172043d48486bf833ef01fe1ddd2695c..99420f58829802e352481b51f9ab12113cbb18db 100755 --- a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py @@ -121,7 +121,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + #ingest_subtask = models.Subtask.objects.get(task_blueprints__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() @@ -434,7 +434,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + #ingest_subtask = models.Subtask.objects.get(task_blueprints__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() @@ -645,7 +645,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + #ingest_subtask = models.Subtask.objects.get(task_blueprints__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() @@ -883,7 +883,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + #ingest_subtask = models.Subtask.objects.get(task_blueprints__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() @@ -1180,7 +1180,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + #ingest_subtask = models.Subtask.objects.get(task_blueprints__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() diff --git a/SAS/TMSS/backend/test/t_adapter.py b/SAS/TMSS/backend/test/t_adapter.py index 772a2d43ed706e328371dc2cdb048f38f65db9ed..f4d92e8ec8cedf1d9222aedc65d09be6bb18e9a9 100755 --- a/SAS/TMSS/backend/test/t_adapter.py +++ b/SAS/TMSS/backend/test/t_adapter.py @@ -63,6 +63,7 @@ class ObservationParsetAdapterTest(unittest.TestCase): subtask_template = models.SubtaskTemplate.objects.get(name='observation control') subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc) subtask:models.Subtask = models.Subtask.objects.create(**subtask_data) + subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())]) subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask)) dataproduct:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output)) return subtask @@ -220,7 +221,7 @@ class SIPadapterTest(unittest.TestCase): # Create SubTask(output) subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc) subtask:models.Subtask = models.Subtask.objects.create(**subtask_data) - + subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())]) subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask)) # Create Dataproduct dataproduct: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(feedback_doc=feedback_doc, producer=subtask_output)) diff --git a/SAS/TMSS/backend/test/t_complex_serializers.py b/SAS/TMSS/backend/test/t_complex_serializers.py index c6e27c5424809cbc36e07a8a92ef4d6c09222bf5..c49d0ae9940b02fcf4fc70b8081fb2c071c66783 100755 --- a/SAS/TMSS/backend/test/t_complex_serializers.py +++ b/SAS/TMSS/backend/test/t_complex_serializers.py @@ -49,7 +49,7 @@ class DynamicRelationalHyperlinkedModelSerializerTestCase(unittest.TestCase): # create some connected objects cls.td_url = test_data_creator.post_data_and_get_url(test_data_creator.TaskDraft(), '/task_draft/') cls.tb_url = test_data_creator.post_data_and_get_url(test_data_creator.TaskBlueprint(draft_url=cls.td_url), '/task_blueprint/') - test_data_creator.post_data_and_get_url(test_data_creator.Subtask(task_blueprint_url=cls.tb_url), '/subtask/') + test_data_creator.post_data_and_get_url(test_data_creator.Subtask(task_blueprint_urls=[cls.tb_url]), '/subtask/') def test_GET_task_draft_serializes_to_depth_0_by_default(self): diff --git a/SAS/TMSS/backend/test/t_permissions_system_roles.py b/SAS/TMSS/backend/test/t_permissions_system_roles.py index 5d05682bec00597c71fc3ae94f46eaf6cc35a0d9..00287ea1ac243e495d6373168c2ccb965d2f6477 100755 --- a/SAS/TMSS/backend/test/t_permissions_system_roles.py +++ b/SAS/TMSS/backend/test/t_permissions_system_roles.py @@ -94,12 +94,13 @@ class SystemPermissionTestCase(unittest.TestCase): obs_subtask_data = test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url'], specifications_doc=obs_spec, cluster_url=cluster_url, - task_blueprint_url=obs_task_blueprint['url'], + task_blueprint_urls=[obs_task_blueprint['url']], raw_feedback='Observation.Correlator.channelWidth=3051.7578125') obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') cls.obs_subtask_id = obs_subtask['id'] obs_subtask_output_url = test_data_creator.post_data_and_get_url( - test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') + test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url'], + task_blueprint_url=obs_task_blueprint['url']), '/subtask_output/') test_data_creator.post_data_and_get_url( test_data_creator.Dataproduct(filename="L%s_SB000.MS" % obs_subtask['id'], subtask_output_url=obs_subtask_output_url), '/dataproduct/') diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 6a6ff816fce2866f0f34a9c07c805aac6a83bf6c..f750cbd585edc0505ed348ae6ad173d77f50ec1e 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -73,8 +73,10 @@ def create_subtask_object_for_testing(subtask_type_value, subtask_state_value): task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(specifications_template=models.TaskTemplate.objects.get(name='target observation' if subtask_type_value=='observation' else 'preprocessing pipeline'))) subtask_template_obj = models.SubtaskTemplate.objects.get(name="%s control" % subtask_type_value) subtask_state_obj = models.SubtaskState.objects.get(value=subtask_state_value) - subtask_data = Subtask_test_data(subtask_template=subtask_template_obj, state=subtask_state_obj, task_blueprint=task_blueprint) - return models.Subtask.objects.create(**subtask_data) + subtask_data = Subtask_test_data(subtask_template=subtask_template_obj, state=subtask_state_obj) + subtask = models.Subtask.objects.create(**subtask_data) + subtask.task_blueprints.set([task_blueprint]) + return subtask def create_reserved_stations_for_testing(station_list): @@ -137,10 +139,11 @@ class SchedulingTest(unittest.TestCase): specifications_doc=spec, cluster_url=cluster_url, start_time=datetime.utcnow()+timedelta(minutes=5), - task_blueprint_url=task_blueprint['url']) + task_blueprint_urls=[task_blueprint['url']]) subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') subtask_id = subtask['id'] - test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), '/subtask_output/') + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url'], + task_blueprint_url=task_blueprint['url']), '/subtask_output/') client.set_subtask_status(subtask_id, 'defined') subtask = client.schedule_subtask(subtask_id) @@ -196,10 +199,11 @@ class SchedulingTest(unittest.TestCase): specifications_doc=spec, cluster_url=cluster_url, start_time=datetime.utcnow() + timedelta(minutes=5), - task_blueprint_url=task_blueprint['url']) + task_blueprint_urls=[task_blueprint['url']]) subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') subtask_id = subtask['id'] - test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), '/subtask_output/') + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url'], + task_blueprint_url=task_blueprint['url']), '/subtask_output/') client.set_subtask_status(subtask_id, 'defined') @@ -233,10 +237,11 @@ class SchedulingTest(unittest.TestCase): specifications_doc=spec, cluster_url=cluster_url, start_time=datetime.utcnow() + timedelta(minutes=5), - task_blueprint_url=task_blueprint['url']) + task_blueprint_urls=[task_blueprint['url']]) subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') subtask_id = subtask['id'] - test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), '/subtask_output/') + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url'], + task_blueprint_url=task_blueprint['url']), '/subtask_output/') client.set_subtask_status(subtask_id, 'defined') @@ -269,10 +274,11 @@ class SchedulingTest(unittest.TestCase): specifications_doc=spec, cluster_url=cluster_url, start_time=datetime.utcnow()+timedelta(minutes=5), - task_blueprint_url=task_blueprint['url']) + task_blueprint_urls=[task_blueprint['url']]) subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') subtask_id = subtask['id'] - test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url'], + task_blueprint_url=task_blueprint['url']), '/subtask_output/') client.set_subtask_status(subtask_id, 'defined') @@ -295,9 +301,10 @@ class SchedulingTest(unittest.TestCase): obs_subtask_data = test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url'], specifications_doc=obs_spec, cluster_url=cluster_url, - task_blueprint_url=obs_task_blueprint['url']) + task_blueprint_urls=[obs_task_blueprint['url']]) obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') - obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') + obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url'], + task_blueprint_url=obs_task_blueprint['url']), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], specifications_doc={"sap": "target0", "subband": 0 }, subtask_output_url=obs_subtask_output_url), '/dataproduct/') @@ -311,13 +318,14 @@ class SchedulingTest(unittest.TestCase): pipe_subtask_data = test_data_creator.Subtask(specifications_template_url=pipe_subtask_template['url'], specifications_doc=pipe_spec, - task_blueprint_url=pipe_task_blueprint['url'], + task_blueprint_urls=[pipe_task_blueprint['url']], cluster_url=cluster_url) pipe_subtask = test_data_creator.post_data_and_get_response_as_json_object(pipe_subtask_data, '/subtask/') # ...and connect it to the observation test_data_creator.post_data_and_get_url(test_data_creator.SubtaskInput(subtask_url=pipe_subtask['url'], subtask_output_url=obs_subtask_output_url), '/subtask_input/') - test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=pipe_subtask['url']), '/subtask_output/') + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=pipe_subtask['url'], + task_blueprint_url=pipe_task_blueprint['url']), '/subtask_output/') for predecessor in client.get_subtask_predecessors(pipe_subtask['id']): client.set_subtask_status(predecessor['id'], 'finished') @@ -340,9 +348,10 @@ class SchedulingTest(unittest.TestCase): obs_subtask_data = test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url'], specifications_doc=obs_spec, cluster_url=cluster_url, - task_blueprint_url=test_data_creator.post_data_and_get_url(test_data_creator.TaskBlueprint(), '/task_blueprint/')) + task_blueprint_urls=[test_data_creator.post_data_and_get_url(test_data_creator.TaskBlueprint(), '/task_blueprint/')]) obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') - obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') + obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url'], + task_blueprint_url=obs_subtask['task_blueprints'][0]), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], specifications_doc={"sap": "target0", "subband": 0}, subtask_output_url=obs_subtask_output_url), '/dataproduct/') @@ -353,19 +362,20 @@ class SchedulingTest(unittest.TestCase): ingest_subtask_data = test_data_creator.Subtask(specifications_template_url=ingest_subtask_template['url'], specifications_doc=ingest_spec, - task_blueprint_url=obs_subtask['task_blueprint'], + task_blueprint_urls=obs_subtask['task_blueprints'], cluster_url=cluster_url) ingest_subtask = test_data_creator.post_data_and_get_response_as_json_object(ingest_subtask_data, '/subtask/') # ...and connect it to the observation test_data_creator.post_data_and_get_url(test_data_creator.SubtaskInput(subtask_url=ingest_subtask['url'], subtask_output_url=obs_subtask_output_url), '/subtask_input/') - test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=ingest_subtask['url']), '/subtask_output/') + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=ingest_subtask['url'], + task_blueprint_url=obs_subtask['task_blueprints'][0]), '/subtask_output/') # our subtask here has only one known related task for predecessor in client.get_subtask_predecessors(ingest_subtask['id']): client.set_subtask_status(predecessor['id'], 'finished') client.set_subtask_status(ingest_subtask['id'], 'defined') - task_blueprint = client.get_url_as_json_object(ingest_subtask['task_blueprint']) + task_blueprint = client.get_url_as_json_object(ingest_subtask['task_blueprints'][0]) # our subtask here has only one known related task schedulingunit_blueprint = client.get_url_as_json_object(task_blueprint['scheduling_unit_blueprint']) # first, make sure we need but do not have ingest persmission... @@ -505,6 +515,64 @@ class SubtaskInputOutputTest(unittest.TestCase): self.assertEqual(set(pipe_in1.dataproducts.all()), {dp1_1, dp1_3}) self.assertEqual(set(pipe_in2.dataproducts.all()), {dp2_2}) + @mock.patch("lofar.sas.tmss.tmss.tmssapp.subtasks.assign_or_unassign_resources") + def test_combined_target_calibrator_subtask_connects_dataproducts_to_correct_output(self, assign_resources_mock): + """ + Create a subtask that combines a target and parallel calibrator observation. + Schedule the subtask and assert that dataproducts are assigned to both outputs. + """ + + # setup tasks + cal_task_template = models.TaskTemplate.objects.get(name="calibrator observation") + cal_task_spec = get_default_json_object_for_schema(cal_task_template.schema) + + cal_task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(specifications_template=cal_task_template, specifications_doc=cal_task_spec)) + cal_task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=cal_task_draft)) + + target_task_template = models.TaskTemplate.objects.get(name="target observation") + target_task_spec = get_default_json_object_for_schema(target_task_template.schema) + target_task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(specifications_template=target_task_template, specifications_doc=target_task_spec)) + target_task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=target_task_draft, + scheduling_unit_blueprint=cal_task_blueprint.scheduling_unit_blueprint)) + + models.TaskSchedulingRelationBlueprint.objects.create(first=cal_task_blueprint, second=target_task_blueprint, + placement=models.SchedulingRelationPlacement.objects.get(value='parallel')) + + # specify two beams with known number of subbands + target_task_blueprint.specifications_doc['SAPs'] = [{'name': 'target1_combined', 'target': '', 'subbands': [0, 1], + 'digital_pointing': {'angle1': 0.1, 'angle2': 0.1, + 'direction_type': 'J2000'}}, + {'name': 'target2_combined', 'target': '', 'subbands': [2, 3, 4], + 'digital_pointing': {'angle1': 0.1, 'angle2': 0.1, + 'direction_type': 'J2000'}} + ] + target_task_blueprint.save() + cal_task_blueprint.specifications_doc['name'] = "calibrator_combined" + cal_task_blueprint.save() + + # create subtask + create_observation_control_subtask_from_task_blueprint(target_task_blueprint) + subtask = create_observation_control_subtask_from_task_blueprint(cal_task_blueprint) + subtask.start_time = datetime.utcnow() + subtask.stop_time = datetime.utcnow() + subtask.save() + + # assert no dataproducts are connected before scheduling + target_output = subtask.outputs.filter(task_blueprint=target_task_blueprint).first() + cal_output = subtask.outputs.filter(task_blueprint=cal_task_blueprint).first() + self.assertEqual(target_output.dataproducts.count(), 0) + self.assertEqual(cal_output.dataproducts.count(), 0) + + # schedule, and assert subtask state + self.assertEqual('defined', subtask.state.value) + schedule_observation_subtask(subtask) + self.assertEqual('scheduled', subtask.state.value) + + # assert dataproducts are connected to both outputs after scheduling + # task and calibrator tasks should each have associated one dataproduct per subband of the target task + self.assertEqual(target_output.dataproducts.count(), 5) + self.assertEqual(cal_output.dataproducts.count(), 5) + class SAPTest(unittest.TestCase): """ @@ -532,12 +600,13 @@ class SAPTest(unittest.TestCase): subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], specifications_doc=spec, cluster_url = cluster_url, - task_blueprint_url=task_blueprint['url'], + task_blueprint_urls=[task_blueprint['url']], start_time=datetime.utcnow() + timedelta(minutes=5), stop_time=datetime.utcnow() + timedelta(minutes=15)) subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') subtask_id = subtask['id'] - test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url'], + task_blueprint_url=task_blueprint['url']), '/subtask_output/') subtask_model = models.Subtask.objects.get(id=subtask_id) diff --git a/SAS/TMSS/backend/test/t_scheduling_units.py b/SAS/TMSS/backend/test/t_scheduling_units.py index 48bf809de5810a31de54e767a58e45f61815be4e..98234e7d6bee7b43c22f395e402196538683b288 100644 --- a/SAS/TMSS/backend/test/t_scheduling_units.py +++ b/SAS/TMSS/backend/test/t_scheduling_units.py @@ -72,12 +72,13 @@ class SchedulingUnitBlueprintStateTest(unittest.TestCase): # Create observation task task_data = TaskBlueprint_test_data(name="Task Observation "+str(uuid.uuid4()), scheduling_unit_blueprint=schedulingunit_blueprint) task_obs = models.TaskBlueprint.objects.create(**task_data) - subtask_data = Subtask_test_data(task_obs, state=models.SubtaskState.objects.get(value="defined"), + subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) if "observation" in skip_create_subtask: subtask_obs = None else: subtask_obs = models.Subtask.objects.create(**subtask_data) + subtask_obs.task_blueprints.set([task_obs]) # Create pipeline task task_data = TaskBlueprint_test_data(name="Task Pipeline", scheduling_unit_blueprint=schedulingunit_blueprint) @@ -85,13 +86,13 @@ class SchedulingUnitBlueprintStateTest(unittest.TestCase): # Need to change the default template type (observation) to pipeline task_pipe.specifications_template = models.TaskTemplate.objects.get(type=models.TaskType.Choices.PIPELINE.value) task_pipe.save() - subtask_data = Subtask_test_data(task_pipe, - state=models.SubtaskState.objects.get(value="defined"), + subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) if "pipeline" in skip_create_subtask: subtask_pipe = None else: subtask_pipe = models.Subtask.objects.create(**subtask_data) + subtask_pipe.task_blueprints.set([task_pipe]) # Create ingest task # Because there is no taskTemplate object for ingest by default I have to create one @@ -103,13 +104,13 @@ class SchedulingUnitBlueprintStateTest(unittest.TestCase): task_ingest.save() # There is no template defined for ingest yet ...but I can use pipeline control, only the template type matters # ....should become other thing in future but for this test does not matter - subtask_data = Subtask_test_data(task_ingest, - state=models.SubtaskState.objects.get(value="defined"), + subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) if "ingest" in skip_create_subtask: subtask_ingest = None else: subtask_ingest = models.Subtask.objects.create(**subtask_data) + subtask_ingest.task_blueprints.set([task_ingest]) return {"observation": {"task": task_obs, "subtask": subtask_obs}, "pipeline": {"task": task_pipe, "subtask": subtask_pipe}, diff --git a/SAS/TMSS/backend/test/t_subtask_validation.py b/SAS/TMSS/backend/test/t_subtask_validation.py index 11c2fc94bf38726ba03658649227c724f73b0a1c..2abd4418e535a5aeb6c8bbd1e91bcd7d49acb876 100755 --- a/SAS/TMSS/backend/test/t_subtask_validation.py +++ b/SAS/TMSS/backend/test/t_subtask_validation.py @@ -68,10 +68,11 @@ class SubtaskValidationTest(unittest.TestCase): subtask_template = self.create_subtask_template(minimal_json_schema()) specifications_doc = '{ this is not a json object }' subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc, - task_blueprint=self.task_blueprint, cluster=self.cluster, state=self.state) + cluster=self.cluster, state=self.state) with self.assertRaises(SchemaValidationException) as context: - models.Subtask.objects.create(**subtask_data) + subtask = models.Subtask.objects.create(**subtask_data) + subtask.task_blueprints.set([self.task_blueprint]) self.assertTrue('invalid json' in str(context.exception).lower()) def test_validate_correlator_schema_with_valid_specification(self): @@ -81,7 +82,7 @@ class SubtaskValidationTest(unittest.TestCase): specifications_doc = get_default_json_object_for_schema(subtask_template.schema) subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc, - task_blueprint=self.task_blueprint, cluster=self.cluster, state=self.state) + cluster=self.cluster, state=self.state) subtask = models.Subtask.objects.create(**subtask_data) self.assertIsNotNone(subtask) @@ -94,15 +95,15 @@ class SubtaskValidationTest(unittest.TestCase): # test with invalid json with self.assertRaises(SchemaValidationException) as context: subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc="bogus spec", - task_blueprint=self.task_blueprint, cluster=self.cluster, state=self.state) - models.Subtask.objects.create(**subtask_data) + cluster=self.cluster, state=self.state) + subtask = models.Subtask.objects.create(**subtask_data) # test with valid json, but not according to schema with self.assertRaises(SchemaValidationException) as context: specifications_doc = get_default_json_object_for_schema(subtask_template.schema) specifications_doc['COBALT']['blocksize'] = -1 # invalid value, should cause the SchemaValidationException subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc, - task_blueprint=self.task_blueprint, cluster=self.cluster, state=self.state) + cluster=self.cluster, state=self.state) models.Subtask.objects.create(**subtask_data) self.assertTrue('-1 is less than the minimum' in str(context.exception).lower()) diff --git a/SAS/TMSS/backend/test/t_subtasks.py b/SAS/TMSS/backend/test/t_subtasks.py index 8086f231da703fba4bcdf574bed9940f0ee6d3d2..260f747acbcf75515cae69646b1a683c7ebdcae9 100755 --- a/SAS/TMSS/backend/test/t_subtasks.py +++ b/SAS/TMSS/backend/test/t_subtasks.py @@ -51,7 +51,9 @@ def create_subtask_object_for_testing(subtask_type_value, subtask_state_value): subtask_template_obj = create_subtask_template_for_testing(template_type) subtask_state_obj = models.SubtaskState.objects.get(value=subtask_state_value) subtask_data = Subtask_test_data(subtask_template=subtask_template_obj, state=subtask_state_obj) - return models.Subtask.objects.create(**subtask_data) + subtask = models.Subtask.objects.create(**subtask_data) + subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())]) + return subtask def create_subtask_template_for_testing(template_type: object): @@ -100,7 +102,7 @@ def create_relation_task_blueprint_object_for_testing(blueprint_task_producer, b return task_relation_obj -def create_scheduling_relation_task_blueprint_for_testing(first_task_blueprint, second_task_blueprint): +def create_scheduling_relation_task_blueprint_for_testing(first_task_blueprint, second_task_blueprint, placement='before'): """ Helper function to create a task blueprint relation object between two task blueprint (calibrator and target observation) :param first_task_blueprint: @@ -111,7 +113,7 @@ def create_scheduling_relation_task_blueprint_for_testing(first_task_blueprint, tags=[], first=first_task_blueprint, second=second_task_blueprint, - placement=models.SchedulingRelationPlacement.objects.get(value='before'), + placement=models.SchedulingRelationPlacement.objects.get(value=placement), time_offset=60) return task_scheduling_rel_obj @@ -281,6 +283,80 @@ class SubTasksCreationFromTaskBluePrintCalibrator(unittest.TestCase): self.assertEqual(1.111, subtask.specifications_doc['stations']['analog_pointing']['angle1']) self.assertEqual(2.222, subtask.specifications_doc['stations']['analog_pointing']['angle2']) + def test_create_combined_subtask_from_task_blueprints(self): + """ + Create subtasks from a target task blueprint and a separate calibrator task blueprint. + """ + cal_task_blueprint = create_task_blueprint_object_for_testing(task_template_name="calibrator observation") + target_task_blueprint = create_task_blueprint_object_for_testing() + create_scheduling_relation_task_blueprint_for_testing(cal_task_blueprint, target_task_blueprint, placement='parallel') + + subtask_1 = create_observation_control_subtask_from_task_blueprint(target_task_blueprint) + num_pointings_target = len(subtask_1.specifications_doc['stations']['digital_pointings']) + + # assert target subtask still in defining state + self.assertEqual("defining", str(subtask_1.state)) + + subtask_2 = create_observation_control_subtask_from_task_blueprint(cal_task_blueprint) + + # assert the same subtask is returned + self.assertEqual(subtask_1, subtask_2) + + # assert the calibrator obs was added as an additional beam + num_pointings_calibrator = len(subtask_2.specifications_doc['stations']['digital_pointings']) + self.assertEqual(num_pointings_target + 1, num_pointings_calibrator) + + # assert the subtask is now in defined state + self.assertEqual("defined", str(subtask_2.state)) + + # assert the subtask references both tasks + self.assertEqual(subtask_1.task_blueprints.count(), 2) + self.assertIn(target_task_blueprint, subtask_1.task_blueprints.all()) + self.assertIn(cal_task_blueprint, subtask_1.task_blueprints.all()) + + # assert we have subtask outputs for both tasks + self.assertEqual(subtask_1.outputs.count(), 2) + self.assertEqual(subtask_1.outputs.filter(task_blueprint=target_task_blueprint).count(), 1) + self.assertEqual(subtask_1.outputs.filter(task_blueprint=cal_task_blueprint).count(), 1) + + def test_create_combined_subtask_from_task_blueprints_fails_if_calibrator_handled_before_target(self): + """ + Create subtasks from a target task blueprint and a separate calibrator task blueprint. + Handling calibrator before target task should raise Exception. + """ + cal_task_blueprint = create_task_blueprint_object_for_testing(task_template_name="calibrator observation") + target_task_blueprint = create_task_blueprint_object_for_testing() + create_scheduling_relation_task_blueprint_for_testing(cal_task_blueprint, target_task_blueprint, placement='parallel') + + with self.assertRaises(SubtaskCreationException) as cm: + create_observation_control_subtask_from_task_blueprint(cal_task_blueprint) + create_observation_control_subtask_from_task_blueprint(target_task_blueprint) + + self.assertIn("cannot be added to the target subtask, because it does not exist", str(cm.exception)) + + def test_create_combined_subtask_from_task_blueprints_fails_if_calibrator_does_not_fit(self): + """ + Create subtasks from a target task blueprint and a separate calibrator task blueprint. + And exception is raised when the combined number of subbands exceeds 488. + """ + cal_task_blueprint = create_task_blueprint_object_for_testing(task_template_name="calibrator observation") + target_task_blueprint = create_task_blueprint_object_for_testing() + create_scheduling_relation_task_blueprint_for_testing(cal_task_blueprint, target_task_blueprint, placement='parallel') + + target_task_blueprint.specifications_doc['SAPs'] = [{'name': 'target1', 'target': '', 'subbands': list(range(0, 150)), + 'digital_pointing': {'angle1': 0.1, 'angle2': 0.1, + 'direction_type': 'J2000'}}, + {'name': 'target2', 'target': '', 'subbands': list(range(150, 300)), + 'digital_pointing': {'angle1': 0.2, 'angle2': 0.2, + 'direction_type': 'J2000'}}] + target_task_blueprint.save() + + with self.assertRaises(SubtaskCreationException) as cm: + create_observation_control_subtask_from_task_blueprint(target_task_blueprint) + create_observation_control_subtask_from_task_blueprint(cal_task_blueprint) + + self.assertIn("results in 600 total subbands, but only 488 are possible", str(cm.exception)) + class SubTaskCreationFromTaskBlueprintIngest(unittest.TestCase): diff --git a/SAS/TMSS/backend/test/t_tasks.py b/SAS/TMSS/backend/test/t_tasks.py index 88e4791390c6e46ff365372fe86cc79be91f24b3..12ded040d83e4fb685bf56f259fe83cec5d83eec 100755 --- a/SAS/TMSS/backend/test/t_tasks.py +++ b/SAS/TMSS/backend/test/t_tasks.py @@ -264,9 +264,10 @@ class TaskBlueprintStateTest(unittest.TestCase): task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With One Subtask") task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) # Create pipeline subtask related to taskblueprint - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) subtask_pipe = models.Subtask.objects.create(**subtask_data) + subtask_pipe.task_blueprints.set([task_blueprint]) # Do the actual test for test_item in test_table: @@ -333,12 +334,14 @@ class TaskBlueprintStateTest(unittest.TestCase): task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With Subtasks") task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) # Create observation and qa subtask related to taskblueprint - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) subtask_obs = models.Subtask.objects.create(**subtask_data) - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_obs.task_blueprints.set([task_blueprint]) + subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), subtask_template=models.SubtaskTemplate.objects.get(name='QA file conversion')) subtask_qa = models.Subtask.objects.create(**subtask_data) + subtask_qa.task_blueprints.set([task_blueprint]) # Do the actual test for test_item in test_table: @@ -374,14 +377,19 @@ class TaskBlueprintStateTest(unittest.TestCase): task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With Subtasks") task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) # Create observation and qa subtasks related to taskblueprint - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) subtask_obs1 = models.Subtask.objects.create(**subtask_data) + subtask_obs1.task_blueprints.set([task_blueprint]) subtask_obs2 = models.Subtask.objects.create(**subtask_data) - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_obs2.task_blueprints.set([task_blueprint]) + + subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"), subtask_template=models.SubtaskTemplate.objects.get(name='QA file conversion')) subtask_qa1 = models.Subtask.objects.create(**subtask_data) + subtask_qa1.task_blueprints.set([task_blueprint]) subtask_qa2 = models.Subtask.objects.create(**subtask_data) + subtask_qa2.task_blueprints.set([task_blueprint]) # Do the actual test for test_item in test_table: diff --git a/SAS/TMSS/backend/test/t_tmssapp_scheduling_REST_API.py b/SAS/TMSS/backend/test/t_tmssapp_scheduling_REST_API.py index 711ebc4a0d5f0b448358b4a302efeb20dfddba8d..1f4dbb16b5f032ef5fd02dc89eb45876c96532c6 100755 --- a/SAS/TMSS/backend/test/t_tmssapp_scheduling_REST_API.py +++ b/SAS/TMSS/backend/test/t_tmssapp_scheduling_REST_API.py @@ -285,7 +285,7 @@ class SubtaskTestCase(unittest.TestCase): GET_and_assert_equal_expected_code(self, BASE_URL + '/subtask/1234321/', 404) def test_subtask_POST_and_GET(self): - st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_url=self.task_blueprint_url, specifications_template_url=self.specifications_template_url) + st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_urls=[self.task_blueprint_url], specifications_template_url=self.specifications_template_url) # POST and GET a new item and assert correctness r_dict = POST_and_assert_expected_response(self, BASE_URL + '/subtask/', st_test_data, 201, st_test_data) @@ -298,13 +298,13 @@ class SubtaskTestCase(unittest.TestCase): self.assertGreaterEqual(int(subtask_id), minimium_subtaskid) def test_subtask_PUT_invalid_raises_error(self): - st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_url=self.task_blueprint_url, specifications_template_url=self.specifications_template_url) + st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_urls=[self.task_blueprint_url], specifications_template_url=self.specifications_template_url) PUT_and_assert_expected_response(self, BASE_URL + '/subtask/9876789876/', st_test_data, 404, {}) def test_subtask_PUT(self): - st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_url=self.task_blueprint_url, specifications_template_url=self.specifications_template_url) - st_test_data2 = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_url=self.task_blueprint_url, specifications_template_url=self.specifications_template_url) + st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_urls=[self.task_blueprint_url], specifications_template_url=self.specifications_template_url) + st_test_data2 = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_urls=[self.task_blueprint_url], specifications_template_url=self.specifications_template_url) # POST new item, verify r_dict = POST_and_assert_expected_response(self, BASE_URL + '/subtask/', st_test_data, 201, st_test_data) @@ -316,7 +316,7 @@ class SubtaskTestCase(unittest.TestCase): GET_OK_and_assert_equal_expected_response(self, url, st_test_data2) def test_subtask_PATCH(self): - st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_url=self.task_blueprint_url, specifications_template_url=self.specifications_template_url) + st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_urls=[self.task_blueprint_url], specifications_template_url=self.specifications_template_url) # POST new item, verify r_dict = POST_and_assert_expected_response(self, BASE_URL + '/subtask/', st_test_data, 201, st_test_data) @@ -332,7 +332,7 @@ class SubtaskTestCase(unittest.TestCase): GET_OK_and_assert_equal_expected_response(self, url, expected_data) def test_subtask_DELETE(self): - st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_url=self.task_blueprint_url, specifications_template_url=self.specifications_template_url) + st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_urls=[self.task_blueprint_url], specifications_template_url=self.specifications_template_url) # POST new item, verify r_dict = POST_and_assert_expected_response(self, BASE_URL + '/subtask/', st_test_data, 201, st_test_data) @@ -343,7 +343,7 @@ class SubtaskTestCase(unittest.TestCase): DELETE_and_assert_gone(self, url) def test_subtask_PROTECT_behavior_on_state_choice_deleted(self): - st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_url=self.task_blueprint_url, specifications_template_url=self.specifications_template_url) + st_test_data = test_data_creator.Subtask(cluster_url=self.cluster_url, task_blueprint_urls=[self.task_blueprint_url], specifications_template_url=self.specifications_template_url) # create dependency that is safe to delete (enums are not populated / re-established between tests) state_data = {'value': 'kickme'} @@ -369,7 +369,7 @@ class SubtaskTestCase(unittest.TestCase): template_url=self.task_blueprint_data['specifications_template'], scheduling_unit_blueprint_url=self.task_blueprint_data['scheduling_unit_blueprint']) task_blueprint_url = test_data_creator.post_data_and_get_url(tbp_test_data, '/task_blueprint/') - st_test_data = test_data_creator.Subtask(task_blueprint_url=task_blueprint_url, cluster_url=self.cluster_url, specifications_template_url=self.specifications_template_url) + st_test_data = test_data_creator.Subtask(task_blueprint_urls=[task_blueprint_url], cluster_url=self.cluster_url, specifications_template_url=self.specifications_template_url) # POST new item and verify url = POST_and_assert_expected_response(self, BASE_URL + '/subtask/', st_test_data, 201, st_test_data)['url'] @@ -387,7 +387,7 @@ class SubtaskTestCase(unittest.TestCase): stt_test_data = test_data_creator.SubtaskTemplate() expected_data = test_data_creator.update_schema_from_template("subtasktemplate", stt_test_data) specifications_template_url = test_data_creator.post_data_and_get_url(stt_test_data, '/subtask_template/') - st_test_data = test_data_creator.Subtask(specifications_template_url=specifications_template_url, cluster_url=self.cluster_url, task_blueprint_url=self.task_blueprint_url) + st_test_data = test_data_creator.Subtask(specifications_template_url=specifications_template_url, cluster_url=self.cluster_url, task_blueprint_urls=[self.task_blueprint_url]) # POST new item and verify url = POST_and_assert_expected_response(self, BASE_URL + '/subtask/', st_test_data, 201, st_test_data)['url'] @@ -587,8 +587,8 @@ class SubtaskInputTestCase(unittest.TestCase): # make new subtask_url instance, but reuse related data for speed subtask_url = test_data_creator.post_data_and_get_url(test_data_creator.Subtask(cluster_url=self.subtask_data['cluster'], - task_blueprint_url=self.subtask_data['task_blueprint'], - specifications_template_url=self.subtask_data['specifications_template'], + task_blueprint_urls=[self.subtask_data['task_blueprint']], + specifications_template_urls=self.subtask_data['specifications_template'], specifications_doc=self.subtask_data['specifications_doc']), '/subtask/') test_patch = {"subtask": subtask_url, "tags": ['FANCYTAG'], @@ -614,7 +614,7 @@ class SubtaskInputTestCase(unittest.TestCase): def test_subtask_input_CASCADE_behavior_on_subtask_deleted(self): # make new subtask_url instance, but reuse related data for speed subtask_url = test_data_creator.post_data_and_get_url(test_data_creator.Subtask(cluster_url=self.subtask_data['cluster'], - task_blueprint_url=self.subtask_data['task_blueprint'], + task_blueprint_urls=[self.subtask_data['task_blueprint']], specifications_template_url=self.subtask_data['specifications_template'], specifications_doc=self.subtask_data['specifications_doc']), '/subtask/') sti_test_data = test_data_creator.SubtaskInput(subtask_url=subtask_url, task_relation_blueprint_url=self.task_relation_blueprint_url, dataproduct_urls=self.dataproduct_urls, subtask_output_url=self.subtask_output_url, task_relation_selection_template_url=self.task_relation_selection_template_url) @@ -1354,7 +1354,7 @@ class SubtaskQueryTestCase(unittest.TestCase): start_time = datetime.now() + timedelta(hours=2, days=day_idx) stop_time = datetime.now() + timedelta(hours=4, days=day_idx) test_data_creator.post_data_and_get_url(test_data_creator.Subtask(start_time=start_time, stop_time=stop_time, - cluster_url=cluster_url, task_blueprint_url=task_blueprint_url), '/subtask/') + cluster_url=cluster_url, task_blueprint_urls=[task_blueprint_url]), '/subtask/') subtasks_test_data_with_start_stop_time = {'clusterB': 50, 'clusterC': 30 } diff --git a/SAS/TMSS/backend/test/t_tmssapp_scheduling_django_API.py b/SAS/TMSS/backend/test/t_tmssapp_scheduling_django_API.py index 682f22659885f52e3a3632ab288861efa19b3b5e..afca166b1a8b2871269661cae58af45b1b79e44d 100755 --- a/SAS/TMSS/backend/test/t_tmssapp_scheduling_django_API.py +++ b/SAS/TMSS/backend/test/t_tmssapp_scheduling_django_API.py @@ -132,6 +132,7 @@ class SubtaskOutputTest(unittest.TestCase): # setup test_data = dict(SubtaskOutput_test_data()) test_data['subtask'] = None + test_data['task_blueprint'] = None # assert with self.assertRaises(IntegrityError): @@ -188,7 +189,9 @@ class SubtaskTest(unittest.TestCase): # setup before = datetime.utcnow() - entry = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=self.task_blueprint)) + entry = models.Subtask.objects.create(**Subtask_test_data()) + entry.task_blueprints.set([self.task_blueprint]) + entry.save() after = datetime.utcnow() @@ -199,7 +202,8 @@ class SubtaskTest(unittest.TestCase): def test_Subtask_update_timestamp_gets_changed_correctly(self): # setup - entry = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=self.task_blueprint)) + entry = models.Subtask.objects.create(**Subtask_test_data()) + entry.task_blueprints.set([self.task_blueprint]) before = datetime.utcnow() entry.save() after = datetime.utcnow() @@ -211,7 +215,7 @@ class SubtaskTest(unittest.TestCase): def test_Subtask_prevents_missing_template(self): # setup - test_data = dict(Subtask_test_data(task_blueprint=self.task_blueprint)) + test_data = dict(Subtask_test_data()) test_data['specifications_template'] = None # assert @@ -219,8 +223,9 @@ class SubtaskTest(unittest.TestCase): models.Subtask.objects.create(**test_data) def test_Subtask_predecessors_and_successors_none(self): - subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=self.task_blueprint)) - subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=self.task_blueprint)) + subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + self.assertEqual(set(), set(subtask1.predecessors.all())) self.assertEqual(set(), set(subtask2.predecessors.all())) @@ -228,10 +233,14 @@ class SubtaskTest(unittest.TestCase): self.assertEqual(set(), set(subtask2.successors.all())) def test_Subtask_predecessors_and_successors_simple(self): - subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=self.task_blueprint)) - subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=self.task_blueprint)) + subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask1.task_blueprints.set([self.task_blueprint]) + subtask1.save() + subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask2.task_blueprints.set([self.task_blueprint]) + subtask2.save() - output1 = models.SubtaskOutput.objects.create(subtask=subtask1) + output1 = models.SubtaskOutput.objects.create(subtask=subtask1, task_blueprint=self.task_blueprint) models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask2, producer=output1)) self.assertEqual(subtask1, subtask2.predecessors.all()[0]) @@ -239,22 +248,32 @@ class SubtaskTest(unittest.TestCase): def test_Subtask_predecessors_and_successors_complex(self): subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) - subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=subtask1.task_blueprint)) - subtask3:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=subtask1.task_blueprint)) - subtask4:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=subtask1.task_blueprint)) - subtask5:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=subtask1.task_blueprint)) - subtask6:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=subtask1.task_blueprint)) + subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask2.task_blueprints.set(subtask1.task_blueprints.all()) + subtask2.save() + subtask3:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask3.task_blueprints.set(subtask1.task_blueprints.all()) + subtask3.save() + subtask4:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask4.task_blueprints.set(subtask1.task_blueprints.all()) + subtask4.save() + subtask5:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask5.task_blueprints.set(subtask1.task_blueprints.all()) + subtask5.save() + subtask6:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask6.task_blueprints.set(subtask1.task_blueprints.all()) + subtask6.save() # ST1 ---> ST3 ---> ST4 # | | # ST2 - -> ST5 ---> ST6 - output1 = models.SubtaskOutput.objects.create(subtask=subtask1) - output2 = models.SubtaskOutput.objects.create(subtask=subtask2) - output3 = models.SubtaskOutput.objects.create(subtask=subtask3) - output4 = models.SubtaskOutput.objects.create(subtask=subtask4) - output5 = models.SubtaskOutput.objects.create(subtask=subtask5) - output6 = models.SubtaskOutput.objects.create(subtask=subtask6) + output1 = models.SubtaskOutput.objects.create(subtask=subtask1, task_blueprint=self.task_blueprint) + output2 = models.SubtaskOutput.objects.create(subtask=subtask2, task_blueprint=self.task_blueprint) + output3 = models.SubtaskOutput.objects.create(subtask=subtask3, task_blueprint=self.task_blueprint) + output4 = models.SubtaskOutput.objects.create(subtask=subtask4, task_blueprint=self.task_blueprint) + output5 = models.SubtaskOutput.objects.create(subtask=subtask5, task_blueprint=self.task_blueprint) + output6 = models.SubtaskOutput.objects.create(subtask=subtask6, task_blueprint=self.task_blueprint) models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask3, producer=output1)) models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask3, producer=output2)) @@ -276,7 +295,8 @@ class SubtaskTest(unittest.TestCase): def test_Subtask_transformed_dataproducts(self): # setup subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) - output1:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask1) + output1:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask1, + task_blueprint=self.task_blueprint) output1_dp:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=output1)) subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) @@ -284,7 +304,8 @@ class SubtaskTest(unittest.TestCase): input2_dp = output1_dp input2.dataproducts.set([input2_dp]) input2.save() - output2:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask2) + output2:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask2, + task_blueprint=self.task_blueprint) output2_dp:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=output2)) models.DataproductTransform.objects.create(input=input2_dp, output=output2_dp, identity=True) diff --git a/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py b/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py index f0c8c331dc951757c7e98c3a3c90b467591446f7..fbcf5dce3898d8fd0e5372649a5205ad1194ab62 100755 --- a/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py +++ b/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py @@ -2016,6 +2016,12 @@ class SchedulingUnitBlueprintTestCase(unittest.TestCase): # setup subtask_1 = models.Subtask.objects.create(**Subtask_test_data(start_time=datetime(2050, 1, 1, 10, 0, 0), stop_time=datetime(2050, 1, 1, 14, 0, 0))) subtask_2 = models.Subtask.objects.create(**Subtask_test_data(start_time=datetime(2050, 1, 5, 10, 0, 0), stop_time=datetime(2050, 1, 5, 14, 0, 0))) + task_blueprint_1 = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data()) + task_blueprint_2 = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data()) + subtask_1.task_blueprints.set([task_blueprint_1]) + subtask_2.task_blueprints.set([task_blueprint_2]) + subtask_1.save() + subtask_2.save() # assert response_1 = GET_and_assert_equal_expected_code(self, BASE_URL + '/scheduling_unit_blueprint/?start_time_after=2050-01-01T9:00:00&stop_time_before=2050-01-01T15:00:00', 200) @@ -2240,10 +2246,10 @@ class TaskBlueprintTestCase(unittest.TestCase): st_test_data_2 = Subtask_test_data() task_blueprint = models.TaskBlueprint.objects.create(**test_data_1) subtask_1 = models.Subtask.objects.create(**st_test_data_1) - subtask_1.task_blueprint = task_blueprint + subtask_1.task_blueprints.set([task_blueprint]) subtask_1.save() subtask_2 = models.Subtask.objects.create(**st_test_data_2) - subtask_2.task_blueprint = task_blueprint + subtask_2.task_blueprints.set([task_blueprint]) subtask_2.save() # assert response_data = GET_and_assert_equal_expected_code(self, BASE_URL + '/task_blueprint/%s/' % task_blueprint.id, 200) @@ -2921,7 +2927,7 @@ class ExtendedViewTestCase(unittest.TestCase): cls.sub_url = test_data_creator.post_data_and_get_url(test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=cls.sud_url), '/scheduling_unit_blueprint/') cls.td_url = test_data_creator.post_data_and_get_url(test_data_creator.TaskDraft(scheduling_unit_draft_url=cls.sud_url), '/task_draft/') cls.tb_url = test_data_creator.post_data_and_get_url(test_data_creator.TaskBlueprint(draft_url=cls.td_url, scheduling_unit_blueprint_url=cls.sub_url), '/task_blueprint/') - test_data_creator.post_data_and_get_url(test_data_creator.Subtask(task_blueprint_url=cls.tb_url), '/subtask/') + test_data_creator.post_data_and_get_url(test_data_creator.Subtask(task_blueprint_urls=[cls.tb_url]), '/subtask/') def test_GET_scheduling_unit_draft_serializes_referenced_objects(self): # get the extended view on the su draft diff --git a/SAS/TMSS/backend/test/test_utils.py b/SAS/TMSS/backend/test/test_utils.py index 022e37d2dd671b884d11a1e15c4c247dc915fcd7..853861fcbca0b8470c0c16085d373326f8905a08 100644 --- a/SAS/TMSS/backend/test/test_utils.py +++ b/SAS/TMSS/backend/test/test_utils.py @@ -721,7 +721,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int self.create_output_dataproducts = create_output_dataproducts def need_to_handle(self, subtask: models.Subtask) -> bool: - if subtask.task_blueprint.scheduling_unit_blueprint.id != self.scheduling_unit_blueprint_id: + if self.scheduling_unit_blueprint_id in [tb.scheduling_unit_blueprint.id for tb in subtask.task_blueprints.all()]: return False if subtask.specifications_template.type.value == models.SubtaskType.Choices.OBSERVATION.value and not self.handle_observations: @@ -759,7 +759,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int pass # trick: trigger any already scheduled subtasks, cascading in events simulating the run - subtasks = models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint_id=self.scheduling_unit_blueprint_id) + subtasks = models.Subtask.objects.filter(task_blueprints__scheduling_unit_blueprint_id=self.scheduling_unit_blueprint_id) for subtask in subtasks.filter(state__value=models.SubtaskState.Choices.SCHEDULED.value): self.onSubTaskStatusChanged(subtask.id, "scheduled") @@ -850,7 +850,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int output_dp.feedback_doc = feedback_doc output_dp.save() elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value: - project_name = subtask.task_blueprint.draft.scheduling_unit_draft.scheduling_set.project.name + project_name = subtask.task_blueprints.first().draft.scheduling_unit_draft.scheduling_set.project.name # todo: support for multiple projects needs to be picked up in TMSS-689 for output_dp in subtask.output_dataproducts: try: @@ -882,12 +882,13 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int if subtask.specifications_template.type.value == 'ingest': logger.info("subtask id=%d is an ingest task which requires permission in order to be scheduled", subtask.id) - if self.auto_grant_ingest_permission and subtask.task_blueprint.scheduling_unit_blueprint.ingest_permission_required: + if self.auto_grant_ingest_permission and any([tb.scheduling_unit_blueprint.ingest_permission_required for tb in subtask.task_blueprints.all()]): # just granting the permission triggers the scheduling_service to check and schedulable ingest subtasks, # resulting in a scheduled ingest subtask. logger.info("granting ingest subtask id=%d ingest_permission", subtask.id) - subtask.task_blueprint.scheduling_unit_blueprint.ingest_permission_granted_since = datetime.utcnow() - subtask.task_blueprint.scheduling_unit_blueprint.save() + for tb in subtask.task_blueprints.all(): + tb.scheduling_unit_blueprint.ingest_permission_granted_since = datetime.utcnow() + tb.scheduling_unit_blueprint.save() if next_state: sleep(self.delay) # mimic a little 'processing' delay diff --git a/SAS/TMSS/backend/test/tmss_test_data_django_models.py b/SAS/TMSS/backend/test/tmss_test_data_django_models.py index d9296ec1062f925af4bd73121f9886d337225926..09188f46f2e8bf6efe5299d46a0fd4a3eb1228be 100644 --- a/SAS/TMSS/backend/test/tmss_test_data_django_models.py +++ b/SAS/TMSS/backend/test/tmss_test_data_django_models.py @@ -361,11 +361,15 @@ def DataproductFeedbackTemplate_test_data() -> dict: "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} -def SubtaskOutput_test_data(subtask: models.Subtask=None) -> dict: +def SubtaskOutput_test_data(subtask: models.Subtask=None, task_blueprint: models.TaskBlueprint=None) -> dict: if subtask is None: subtask = models.Subtask.objects.create(**Subtask_test_data()) + if task_blueprint is None: + task_blueprint = models. TaskBlueprint.objects.create(**TaskBlueprint_test_data(())) + return {"subtask": subtask, + "task_blueprint": task_blueprint, "tags":[]} def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None, selection_doc=None, selection_template: models.TaskRelationSelectionTemplate=None) -> dict: @@ -388,13 +392,10 @@ def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.Subtas "selection_template": selection_template, "tags":[]} -def Subtask_test_data(task_blueprint: models.TaskBlueprint=None, subtask_template: models.SubtaskTemplate=None, +def Subtask_test_data(subtask_template: models.SubtaskTemplate=None, specifications_doc: dict=None, start_time=None, stop_time=None, cluster=None, state=None, raw_feedback=None) -> dict: - if task_blueprint is None: - task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data()) - if subtask_template is None: subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data()) @@ -418,7 +419,7 @@ def Subtask_test_data(task_blueprint: models.TaskBlueprint=None, subtask_templat "stop_time": stop_time, "state": state, "specifications_doc": specifications_doc, - "task_blueprint": task_blueprint, + #"task_blueprint": task_blueprint, # ManyToMany, use set() "specifications_template": subtask_template, "tags": ["TMSS", "TESTING"], "do_cancel": datetime.utcnow(), diff --git a/SAS/TMSS/backend/test/tmss_test_data_rest.py b/SAS/TMSS/backend/test/tmss_test_data_rest.py index 528e81dd5619d2a6318ec85696d55c3e058c0dea..c25339f480bb372e7404c0694fbb7d5ae767e1fb 100644 --- a/SAS/TMSS/backend/test/tmss_test_data_rest.py +++ b/SAS/TMSS/backend/test/tmss_test_data_rest.py @@ -632,12 +632,12 @@ class TMSSRESTTestDataCreator(): return self._cluster_url - def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None, state:str="defining", start_time: datetime=None, stop_time: datetime=None, raw_feedback:str =None): + def Subtask(self, cluster_url=None, task_blueprint_urls=None, specifications_template_url=None, specifications_doc=None, state:str="defining", start_time: datetime=None, stop_time: datetime=None, raw_feedback:str =None): if cluster_url is None: cluster_url = self.cached_cluster_url - if task_blueprint_url is None: - task_blueprint_url = self.cached_task_blueprint_url + if task_blueprint_urls is None: + task_blueprint_urls = [self.cached_task_blueprint_url] if specifications_template_url is None: specifications_template_url = self.cached_subtask_template_url @@ -661,7 +661,7 @@ class TMSSRESTTestDataCreator(): "stop_time": stop_time, "state": self.django_api_url + '/subtask_state/%s' % (state,), "specifications_doc": specifications_doc, - "task_blueprint": task_blueprint_url, + "task_blueprints": task_blueprint_urls, "specifications_template": specifications_template_url, "tags": ["TMSS", "TESTING"], "do_cancel": datetime.utcnow().isoformat(), @@ -676,11 +676,16 @@ class TMSSRESTTestDataCreator(): self._subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/') return self._subtask_url - def SubtaskOutput(self, subtask_url=None): + def SubtaskOutput(self, subtask_url=None, task_blueprint_url=None): + if subtask_url is None: subtask_url = self.cached_subtask_url + if task_blueprint_url is None: + task_blueprint_url = self.cached_task_blueprint_url + return {"subtask": subtask_url, + "task_blueprint": task_blueprint_url, "tags": []} @property