diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index b73398bc04b11936b24d6f2accf602ec0c3136c8..9729aba52861e1733031ab172c27adde0d861eda 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -315,10 +315,7 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas parset["Observation.tmssID"] = subtask.pk parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize() parset["Observation.processSubtype"] = "Beam Observation" - project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) - if len(project_set) != 1: - raise ConversionException('Subtask id=%s cannot be converted to parset because it references task blueprint that belong to different projects=%s' % (subtask.id, project_set)) - parset["Observation.Campaign.name"] = list(project_set)[0] + parset["Observation.Campaign.name"] = subtask.project.name parset["Observation.Campaign.PI"] = "classified" # pulp needs the PI to be non-empty parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time @@ -387,10 +384,7 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas # ---------------------------- # Retrieve the scheduling_unit_blueprint to get piggyback values to set - sub_set = set([tb.scheduling_unit_blueprint for tb in subtask.task_blueprints.all()]) - if len(sub_set) != 1: - raise ConversionException('Subtask id=%s cannot be converted to parset because it references task blueprint that belong to different scheduling unit blueprints=%s' % (subtask.id, sub_set)) - sub = sub_set.pop() + sub = subtask.task_blueprint.scheduling_unit_blueprint parset["prefix"] = "LOFAR." parset["Observation.claimPeriod"] = 35 @@ -435,11 +429,8 @@ def _common_parset_dict_for_pipeline_schemas(subtask: models.Subtask) -> dict: parset["Observation.processType"] = "Pipeline" - project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) - if len(project_set) != 1: - raise ConversionException('Subtask pk=%s cannot be converted to parset because it references task blueprint that belong to different projects (names=%s)' % (subtask.pk, project_set)) - parset["Observation.Campaign.name"] = list(project_set)[0] - parset["Observation.Scheduler.taskName"] = subtask.task_blueprints.first().name # Scheduler keys are artefacts of an older time. Their content is deprecated, so we don't care whch task we take this from + parset["Observation.Campaign.name"] = subtask.project.name + parset["Observation.Scheduler.taskName"] = subtask.task_blueprint.name # Scheduler keys are artefacts of an older time. Their content is deprecated, so we don't care whch task we take this from parset["Observation.Scheduler.predecessors"] = [] parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = 'cpu' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py index 47772d5a4ec45504390cc655f9f3a3b10548fbb9..9cf6144b9537bf98d9debb12ee113e0227cbcc3e 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py @@ -175,7 +175,6 @@ def create_sip_representation_for_subtask(subtask: Subtask): ) subarraypointings.append(sip_sap) - concatenated_task_descriptions = "\n".join([tb.description for tb in subtask.task_blueprints.order_by("specifications_template__name").all()]) # we could also order by "specifications_template__type__value"? observation = siplib.Observation(observingmode=constants.OBSERVINGMODETYPE_BEAM_OBSERVATION, # can be hardcoded for an observation instrumentfilter=mapping_filterset_type_TMSS_2_SIP[subtask.specifications_doc['stations']['filter']], clock_frequency="200", # fixed, @@ -193,7 +192,7 @@ def create_sip_representation_for_subtask(subtask: Subtask): process_map=process_map, channelwidth_frequency=None, # NA any more ('BlueGene compatibility' see comment in LTA-SIP.xsd) channelwidth_frequencyunit=constants.FREQUENCYUNIT_HZ, # fixed - observationdescription=concatenated_task_descriptions, + observationdescription=subtask.task_blueprint.description, channelspersubband=0, # NA any more ('BlueGene compatibility' see comment in LTA-SIP.xsd) subarraypointings=subarraypointings, transientbufferboardevents=None # fixed @@ -206,11 +205,9 @@ def create_sip_representation_for_subtask(subtask: Subtask): sourcedata_identifiers += [get_siplib_identifier(dp.global_identifier, "Dataproduct id=%s" % dp.id) for dp in input.dataproducts.all()] # todo: use correct id, lookup based on TMSS reference or so, tbd if not sourcedata_identifiers: raise TMSSException("There seems to be no subtask input associated to your pipeline subtask id %s. Please define what data the pipeline processed." % subtask.id) - if subtask.task_blueprints.count() > 1: - raise TMSSException("There are several task blueprints pk=%s associated to subtask pk=%s, but for pipelines, only a single task is supported." % ([tb.pk for tb in subtask.task_blueprints.all()], subtask.pk)) pipeline_map = siplib.PipelineMap( - name=subtask.task_blueprints.first().name, # there is only one + name=subtask.task_blueprint.name, version='unknown', # todo from subtask.specifications_doc? from feedback (we have feedback and storagewriter versions there, not pipeline version or sth)? sourcedata_identifiers=sourcedata_identifiers, process_map=process_map) @@ -591,11 +588,7 @@ def generate_sip_for_dataproduct(dataproduct: Dataproduct) -> siplib.Sip: sip_dataproduct = create_sip_representation_for_dataproduct(dataproduct) # Gather project details - project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in dataproduct.producer.subtask.task_blueprints.all()]) - if len(project_set) != 1: - # todo: support for multiple projects needs to be picked up in TMSS-689 - raise TMSSException('Dataproduct pk=%s references task blueprints that belong to different projects (names=%s). This can currently not be represented in SIP format.' % (dataproduct.pk, project_set)) - project = dataproduct.producer.subtask.project # there must be only one task blueprint + project = dataproduct.producer.subtask.project project_code = project.name project_primaryinvestigator = 'project_primaryinvestigator' project_contactauthor = 'project_contactauthor' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py index 728d97466026dd99b5fb2247c222be2cddccf187..8942a477ae83f2394c55d01d21e5867d1b976b14 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 3.0.9 on 2021-08-24 14:29 +# Generated by Django 3.0.9 on 2021-09-02 15:48 from django.conf import settings import django.contrib.auth.models @@ -1237,8 +1237,8 @@ class Migration(migrations.Migration): ), migrations.AddField( model_name='subtask', - name='task_blueprints', - field=models.ManyToManyField(blank=True, help_text='Task Blueprint to which this Subtask belongs.', related_name='subtasks', to='tmssapp.TaskBlueprint'), + name='task_blueprint', + field=models.ForeignKey(help_text='The parent TaskBlueprint.', null=True, on_delete=django.db.models.deletion.PROTECT, related_name='subtasks', to='tmssapp.TaskBlueprint'), ), migrations.AddConstraint( model_name='stationtimeline', diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index 60d07e5e55fc169b550c7f24a54a646fd6da78d4..72eabea2299362aab642d7fe8b9f7bcb15bed282 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -150,25 +150,17 @@ class Subtask(BasicCommon, ProjectPropertyMixin, TemplateSchemaMixin): state = ForeignKey('SubtaskState', null=False, on_delete=PROTECT, related_name='task_states', help_text='Subtask state (see Subtask State Machine).') primary = BooleanField(default=False, help_text='TRUE if this is the one-and-only primary subtask in a parent TaskBlueprint.') specifications_doc = JSONField(help_text='Final specifications, as input for the controller.') - task_blueprints = ManyToManyField('TaskBlueprint', related_name='subtasks', blank=True, help_text='Task Blueprint to which this Subtask belongs.') + task_blueprint = ForeignKey('TaskBlueprint', null=True, on_delete=PROTECT, related_name='subtasks', help_text='The parent TaskBlueprint.') #TODO: be more strict with null=False specifications_template = ForeignKey('SubtaskTemplate', null=False, on_delete=PROTECT, help_text='Schema used for specifications_doc.') cluster = ForeignKey('Cluster', null=True, on_delete=PROTECT, help_text='Where the Subtask is scheduled to run (NULLable).') # resource_claim = ForeignKey("ResourceClaim", null=False, on_delete=PROTECT) # todo <-- how is this external reference supposed to work? created_or_updated_by_user = ForeignKey(User, null=True, editable=False, on_delete=PROTECT, help_text='The user who created / updated the subtask.') raw_feedback = CharField(null=True, max_length=1048576, help_text='The raw feedback for this Subtask') global_identifier = OneToOneField('SIPidentifier', null=False, editable=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.') - path_to_project = 'task_blueprints__scheduling_unit_blueprint__draft__scheduling_set__project' - - # TODO: find a way to enforce that only one subtask per TaskBlueprint has primary==True. The ManyToManyField makes that dificult. RawSQL? CheckConstraint and Q? - # JS 2021-08-24: Had a chat with Joern about the "problem" of the many-to-many relation between tasks and subtasks. - # It will be too much work (for now) to introduce either the old normal foreignkey relation where 1 task has 1-n subtasks. - # It will also be too much work to make a non-race-condition check on 1 primary subtask per parent task. - # There is a race condition because you first need to create both the task and subtask and then make the link. In between these steps there are forbidden possibilities. - # It is also quircky to introduce a foreignkey relation between task and subtask like 'primary_subtask' or 'is_primary_subtask_of' besides the existing task-subtask many-to-many-relation. - # So, we decided that for now we accept that we just cannot enforce a single-primary-subtask-per-task at database level. - # We think this is acceptable, because only our django tmss application creates subtasks (that's us!), so we are in control of setting the correct subtask as the one-and-only primary subtask. + path_to_project = 'task_blueprint__scheduling_unit_blueprint__draft__scheduling_set__project' + # class Meta(BasicCommon.Meta): - # constraints = [UniqueConstraint(fields=['primary', 'task_blueprints'], name='subtask_unique_primary_subtask')] + # constraints = [UniqueConstraint(fields=['primary', 'task_blueprint'], name='subtask_unique_primary_subtask_within_parent_task')] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -189,12 +181,12 @@ class Subtask(BasicCommon, ProjectPropertyMixin, TemplateSchemaMixin): if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: # observations have a specified duration, so grab it from the spec. # In case we have several associated tasks: use the longest duration, since we assume that tasks will run in parallel (there would be no reason to combine them into a subtask). - return timedelta(seconds=max([tb.specifications_doc.get('duration', tb.specifications_doc.get('target', {}).get('duration', 0)) for tb in self.task_blueprints.all()])) + return timedelta(seconds=self.task_blueprint.specifications_doc.get('duration', self.task_blueprint.specifications_doc.get('target', {}).get('duration', 0))) if self.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: # pipelines usually do not have a specified duration, so make a guess (half the obs duration?). # In case we have several associated tasks: this guess is probably in no way accurate anyway, so we assume it does not really matter which task blueprint we refer to here - return timedelta(seconds=self.task_blueprints.first().specifications_doc.get('duration', max(p.specified_duration.total_seconds() for p in self.predecessors)/2)) + return timedelta(seconds=self.task_blueprint.specifications_doc.get('duration', max(p.specified_duration.total_seconds() for p in self.predecessors)/2)) # other subtasktypes usually depend on cpu/data/network etc. So, make a guess (for now) return timedelta(minutes=5) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index ea277797d8304f5bfe8eabcf4b00579b324fd8da..1d2d09b011ba812ccda109acc442a57ab16e28dc 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -757,9 +757,9 @@ class SchedulingUnitBlueprint(ProjectPropertyMixin, TemplateSchemaMixin, NamedCo return SchedulingUnitBlueprint.Status.SCHEDULABLE.value def _task_graph_instantiated(self): - from .scheduling import Subtask # import here to prevent cirular imports + from .scheduling import Subtask # import here to prevent circular imports return self._get_total_nbr_tasks() > 0 and \ - Subtask.objects.filter(task_blueprints__scheduling_unit_blueprint=self).count() > 0 + Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint=self).count() > 0 def _all_task_finished(self, status_overview_counter): return status_overview_counter["finished"] == self._get_total_nbr_tasks() diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 890d029e6cebbd73213ea467325837e91355e7ef..b389b09bb5217af250a3646f760b20b8fbd52d75 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -93,7 +93,7 @@ def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subta subtasks.append(subtask) except Exception as e: logger.exception(e) - raise SubtaskCreationException('Cannot create subtasks for task id=%s for its schema name=%s in generator %s' % (task_blueprint.pk, template_name, generator)) from e + raise SubtaskCreationException('Cannot create subtasks for task id=%s for its schema name=\'%s\' in generator %s' % (task_blueprint.pk, template_name, generator)) from e return subtasks else: logger.error('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name)) @@ -489,7 +489,7 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), "specifications_doc": specifications_doc, - #"task_blueprint": task_blueprint, # ManyToMany, so use set()! + "task_blueprint": task_blueprint, "specifications_template": subtask_template, "tags": [], "primary": True, @@ -497,7 +497,6 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB } subtask = Subtask.objects.create(**subtask_data) - subtask.task_blueprints.set(list(subtask.task_blueprints.all()) + [task_blueprint]) # step 2: create and link subtask input/output # an observation has no input, it just produces output data @@ -526,8 +525,7 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) https://support.astron.nl/confluence/display/TMSS/Specification+Flow ''' # step 0: check pre-requisites - for tb in observation_subtask.task_blueprints.all(): - check_prerequities_for_subtask_creation(tb) + check_prerequities_for_subtask_creation(observation_subtask.task_blueprint) if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % ( @@ -551,13 +549,12 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) qafile_subtask_data = { "start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), - #"task_blueprint": observation_subtask.task_blueprint, # ManyToMany, use set() + "task_blueprint": observation_subtask.task_blueprint, "specifications_template": qafile_subtask_template, "specifications_doc": qafile_subtask_spec, "primary": False, "cluster": observation_subtask.cluster} qafile_subtask = Subtask.objects.create(**qafile_subtask_data) - qafile_subtask.task_blueprints.set(observation_subtask.task_blueprints.all()) # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") @@ -569,9 +566,8 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) selection_doc=selection_doc, selection_template=selection_template) - for tb in observation_subtask.task_blueprints.all(): - qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, - task_blueprint=tb) + qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, + task_blueprint=observation_subtask.task_blueprint) # step 3: set state to DEFINED qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -608,8 +604,7 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta https://support.astron.nl/confluence/display/TMSS/Specification+Flow ''' # step 0: check pre-requisites - for tb in qafile_subtask.task_blueprints.all(): - check_prerequities_for_subtask_creation(tb) + check_prerequities_for_subtask_creation(qafile_subtask.task_blueprint) if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value: raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % ( @@ -633,13 +628,12 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta qaplots_subtask_data = { "start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), - #"task_blueprint": qafile_subtask.task_blueprint, + "task_blueprint": qafile_subtask.task_blueprint, "specifications_template": qaplots_subtask_template, "specifications_doc": qaplots_subtask_spec_doc, "primary": False, "cluster": qafile_subtask.cluster} qaplots_subtask = Subtask.objects.create(**qaplots_subtask_data) - qaplots_subtask.task_blueprints.set(qafile_subtask.task_blueprints.all()) # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") @@ -649,9 +643,8 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta selection_doc=selection_doc, selection_template=selection_template) - for tb in qafile_subtask.task_blueprints.all(): - qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask, - task_blueprint=tb) + qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask, + task_blueprint=qafile_subtask.task_blueprint) # step 3: set state to DEFINED qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -686,13 +679,12 @@ def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, s subtask_data = { "start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), - #"task_blueprint": task_blueprint, # ManyToMany, so use set()! + "task_blueprint": task_blueprint, "specifications_template": subtask_template, "specifications_doc": subtask_specs, "primary": True, "cluster": Cluster.objects.get(name=cluster_name) } subtask = Subtask.objects.create(**subtask_data) - subtask.task_blueprints.set([task_blueprint]) # step 2: create and link subtask input/output for task_relation_blueprint in task_blueprint.produced_by.all(): @@ -744,13 +736,12 @@ def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> subtask_data = {"start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), - #"task_blueprint": task_blueprint, # ManyToMany, so use set()! + "task_blueprint": task_blueprint, "specifications_template": subtask_template, "specifications_doc": subtask_specs, "primary": True, "cluster": Cluster.objects.get(name=cluster_name)} subtask = Subtask.objects.create(**subtask_data) - subtask.task_blueprints.set([task_blueprint]) # step 2: create and link subtask input for task_relation_blueprint in task_blueprint.produced_by.all(): @@ -787,12 +778,12 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> subtask_data = {"start_time": None, "stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), + "task_blueprint": task_blueprint, "specifications_template": subtask_template, "specifications_doc": subtask_specs, "primary": True, "cluster": Cluster.objects.get(name=cluster_name)} subtask = Subtask.objects.create(**subtask_data) - subtask.task_blueprints.set([task_blueprint]) # step 2: create and link subtask input # for this cleanup subtask an 'input' seems a bit weird, but it actually makes sense! @@ -921,7 +912,7 @@ def update_subtasks_start_times_for_scheduling_unit(scheduling_unit: SchedulingU for task_blueprint in scheduling_unit.task_blueprints.all(): defined_independend_subtasks = task_blueprint.subtasks.filter(state__value='defined').filter(inputs=None).all() for subtask in defined_independend_subtasks: - update_start_time_and_shift_successors_until_after_stop_time(subtask, start_time + min([tb.relative_start_time for tb in subtask.task_blueprints.all()])) # todo: min is correct here? + update_start_time_and_shift_successors_until_after_stop_time(subtask, start_time + subtask.task_blueprint.relative_start_time) def update_start_time_and_shift_successors_until_after_stop_time(subtask: Subtask, start_time: datetime): @@ -943,15 +934,13 @@ def shift_successors_until_after_stop_time(subtask: Subtask): # we now need to look up all combinations between subtask and successor blueprints # to find if theres a relation with a time offset between the tasks... time_offsets = [] - for tb in subtask.task_blueprints.all(): - for successor_tb in successor.task_blueprints.all(): - if tb.id != successor_tb.id: - relations = (TaskSchedulingRelationBlueprint.objects.filter(first=tb, second=successor_tb) | - TaskSchedulingRelationBlueprint.objects.filter(first=successor_tb, second=tb)).all() + if subtask.task_blueprint.id != successor.task_blueprint.id: + relations = (TaskSchedulingRelationBlueprint.objects.filter(first=subtask.task_blueprint, second=successor.task_blueprint) | + TaskSchedulingRelationBlueprint.objects.filter(first=successor.task_blueprint, second=subtask.task_blueprint)).all() - if relations: - # there should be only one scheduling relation between the tasks - time_offsets += [relations[0].time_offset] + if relations: + # there should be only one scheduling relation between the tasks + time_offsets += [relations[0].time_offset] if len(time_offsets) > 0: successor_start_time += timedelta(seconds=max(time_offsets)) @@ -1134,18 +1123,17 @@ def get_station_groups(subtask): :return: station_groups which is a list of dict. { station_list, max_nr_missing } """ station_groups = [] - for task_blueprint in subtask.task_blueprints.all(): - if 'calibrator' in task_blueprint.specifications_template.name.lower(): - # Calibrator requires related Target Task Observation for some specifications - target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint) - if target_task_blueprint is None: - raise SubtaskException("Cannot retrieve related target observation of task_blueprint %d (subtask %d)" % - (task_blueprint.id, subtask.id)) - if "station_groups" in target_task_blueprint.specifications_doc.keys(): - station_groups = target_task_blueprint.specifications_doc["station_groups"] - else: - if "station_groups" in task_blueprint.specifications_doc.keys(): - station_groups = task_blueprint.specifications_doc["station_groups"] + if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower(): + # Calibrator requires related Target Task Observation for some specifications + target_task_blueprint, _ = get_related_target_observation_task_blueprint(subtask.task_blueprint) + if target_task_blueprint is None: + raise SubtaskException("Cannot retrieve related target observation of task_blueprint %d (subtask %d)" % + (subtask.task_blueprint.id, subtask.id)) + if "station_groups" in target_task_blueprint.specifications_doc.keys(): + station_groups = target_task_blueprint.specifications_doc["station_groups"] + else: + if "station_groups" in subtask.task_blueprint.specifications_doc.keys(): + station_groups = subtask.task_blueprint.specifications_doc["station_groups"] return station_groups @@ -1313,15 +1301,9 @@ def _bulk_create_dataproducts_with_global_identifiers(dataproducts: list) -> lis def _output_root_directory(subtask: Subtask) -> str: """ Return the directory under which output needs to be stored. """ - # Support for several projects will be added in TMSS-689, for now catch it. - project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) - if len(project_set) != 1: - raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it references task blueprints that belong to different projects=%s' % (subtask.id, project_set)) - - project = list(project_set)[0] - + # Support for several projects will be added in TMSS-689 directory = "/data/%s/%s/L%s" % ("projects" if isProductionEnvironment() else "test-projects", - project, + subtask.project.name, subtask.id) return directory @@ -1372,24 +1354,17 @@ def schedule_observation_subtask(observation_subtask: Subtask): # select correct output for each pointing based on name subtask_output_dict = {} + output = observation_subtask.outputs.first() + if not output: + raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it is missing the output' % (observation_subtask.id,)) - for task_blueprint in observation_subtask.task_blueprints.all(): - output = observation_subtask.outputs.filter(task_blueprint=task_blueprint).first() - if not output: - raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it is missing the output for ' - 'task_blueprint id=%s (subtask has associated task_blueprints=%s, but ' - 'has outputs for task_blueprint=%s' % (observation_subtask.id, - task_blueprint.id, - [(tb.id, tb.specifications_template.type) for tb in observation_subtask.task_blueprints.all()], - [(out.task_blueprint.id, out.task_blueprint.specifications_template.type) for out in observation_subtask.outputs.all()])) - - target_task_spec, calibrator_task_spec = _get_target_and_or_calibrator_specification(task_blueprint) + target_task_spec, calibrator_task_spec = _get_target_and_or_calibrator_specification(observation_subtask.task_blueprint) - if target_task_spec and 'SAPs' in target_task_spec: # target - for sap in target_task_spec['SAPs']: - subtask_output_dict[sap['name']] = output - if calibrator_task_spec and 'pointing' in calibrator_task_spec: # calibrator - subtask_output_dict[calibrator_task_spec['name']] = output + if target_task_spec and 'SAPs' in target_task_spec: # target + for sap in target_task_spec['SAPs']: + subtask_output_dict[sap['name']] = output + if calibrator_task_spec and 'pointing' in calibrator_task_spec: # calibrator + subtask_output_dict[calibrator_task_spec['name']] = output # create SAP objects, as observations create new beams antennaset = specifications_doc['stations']['antenna_set'] @@ -1791,7 +1766,7 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): SubtaskType.Choices.INGEST.value)) # check permission pre-requisites - scheduling_unit_blueprint = ingest_subtask.task_blueprints.first().scheduling_unit_blueprint # first() is fine because we assume an ingest subtask does not serve tasks across SU boundaries + scheduling_unit_blueprint = ingest_subtask.task_blueprint.scheduling_unit_blueprint # first() is fine because we assume an ingest subtask does not serve tasks across SU boundaries if scheduling_unit_blueprint.ingest_permission_required: if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow(): raise SubtaskSchedulingException("Cannot schedule ingest subtask id=%d because it requires explicit permission and the permission has not been granted (yet)" % (ingest_subtask.pk,)) @@ -2126,18 +2101,17 @@ def get_observation_task_specification_with_check_for_calibrator(subtask): :param: subtask object :return: task_spec: the specifications_doc of the blue print task which is allways a target observation """ - for task_blueprint in subtask.task_blueprints.all(): - if 'calibrator' in task_blueprint.specifications_template.name.lower(): - # Calibrator requires related Target Task Observation for some specifications - target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint) - if target_task_blueprint is None: - raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk) - task_spec = target_task_blueprint.specifications_doc - logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s", - task_blueprint.id, target_task_blueprint.id) - else: - task_spec = task_blueprint.specifications_doc - return task_spec + if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower(): + # Calibrator requires related Target Task Observation for some specifications + target_task_blueprint, _ = get_related_target_observation_task_blueprint(subtask.task_blueprint) + if target_task_blueprint is None: + raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk) + task_spec = target_task_blueprint.specifications_doc + logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s", + subtask.task_blueprint.id, target_task_blueprint.id) + else: + task_spec = subtask.task_blueprint.specifications_doc + return task_spec def cancel_subtask(subtask: Subtask) -> Subtask: diff --git a/SAS/TMSS/backend/test/t_observation_strategies_specification_and_scheduling_test.py b/SAS/TMSS/backend/test/t_observation_strategies_specification_and_scheduling_test.py index 284541b0b272deb9c39a6e1e1c0a579cab2bc59c..fdedcca0fa2cd67693d5c2e978fa32a7a567f04c 100755 --- a/SAS/TMSS/backend/test/t_observation_strategies_specification_and_scheduling_test.py +++ b/SAS/TMSS/backend/test/t_observation_strategies_specification_and_scheduling_test.py @@ -72,11 +72,10 @@ class TestObservationStrategiesSpecificationAndScheduling(unittest.TestCase): '''helper method to fetch the latest statuses of the subtask, its task, and its schedulingunit, and check for the expected statuses''' subtask = self.tmss_client.get_subtask(subtask_id) self.assertEqual(expected_subtask_status, subtask['state_value']) - tasks = [self.tmss_client.get_url_as_json_object(task_url) for task_url in subtask['task_blueprints']] - for task in tasks: - self.assertEqual(expected_task_status, task['status']) - schedunit = self.tmss_client.get_url_as_json_object(task['scheduling_unit_blueprint']) - self.assertEqual(expected_schedunit_status, schedunit['status']) + task = self.tmss_client.get_url_as_json_object(subtask['task_blueprint']) + self.assertEqual(expected_task_status, task['status']) + schedunit = self.tmss_client.get_url_as_json_object(task['scheduling_unit_blueprint']) + self.assertEqual(expected_schedunit_status, schedunit['status']) def test_UC1(self): def check_parset(obs_subtask, is_target_obs:bool):