diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py index f4cbf84e4d70369e99d59c3ee58ee2bf8b9fab26..94cf85480b07a45a381efa18c2dc599ea8cd663f 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py @@ -169,7 +169,7 @@ class TestDynamicScheduling(TestCase): # Note: we use django.test.TestCase inst scheduling_unit_blueprint_medium, scheduling_unit_blueprint_high)).all() self.assertEqual(1, upcoming_scheduled_subtasks.count()) - self.assertEqual(scheduling_unit_blueprint_high.id, upcoming_scheduled_subtasks[0].task_blueprints().first().scheduling_unit_blueprint.id) # todo: first? + self.assertEqual(scheduling_unit_blueprint_high.id, upcoming_scheduled_subtasks[0].task_blueprints().first().scheduling_unit_blueprint.id) # all task blueprints share same SU, so it does not matter which one we check # check scheduling_unit_blueprint_low starts after the scheduled scheduling_unit_blueprint_high self.assertGreater(scheduling_unit_blueprint_low.start_time, scheduling_unit_blueprint_medium.start_time) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index dc82f995c1bbf912ef659044077139e86fd0f136..eddee3ceebe441d90cfef401d152a1ff4363e776 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -294,7 +294,10 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas parset["Observation.tmssID"] = subtask.pk parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize() parset["Observation.processSubtype"] = "Beam Observation" - parset["Observation.Campaign.name"] = subtask.task_blueprints.first().scheduling_unit_blueprint.draft.scheduling_set.project.name # todo: first? + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) + if len(project_set) != 1: + raise ConversionException('Subtask id=%s cannot be converted to parset because it references task blueprint that belong to different projects=%s' % (subtask.id, project_set)) + parset["Observation.Campaign.name"] = list(project_set)[0] parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time parset["Observation.strategy"] = "default" # maybe not mandatory? @@ -414,8 +417,11 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) parset["Observation.processSubtype"] = "Averaging Pipeline" parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "preprocessing_pipeline.py" parset["Observation.ObservationControl.PythonControl.softwareVersion"] = "" - parset["Observation.Campaign.name"] = subtask.task_blueprints.first().scheduling_unit_blueprint.draft.scheduling_set.project.name # todo: first? - parset["Observation.Scheduler.taskName"] = subtask.task_blueprints.first().name # todo: first? + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()]) + if len(project_set) != 1: + raise ConversionException('Subtask pk=%s cannot be converted to parset because it references task blueprint that belong to different projects (names=%s)' % (subtask.pk, project_set)) + parset["Observation.Campaign.name"] = list(project_set)[0] + parset["Observation.Scheduler.taskName"] = subtask.task_blueprints.first().name # Scheduler keys are artefacts of an older time. Their content is deprecated, so we don't care whch task we take this from parset["Observation.Scheduler.predecessors"] = [] parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = 'cpu' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py index 9563790f65f2396584ca5ff8ed81528e52d05a68..ff3267b32e3a95700df1917a45dfc90b6c04fa95 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py @@ -143,7 +143,8 @@ def create_sip_representation_for_subtask(subtask: Subtask): # determine subtask specific properties and add subtask representation to Sip object if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: - subarraypointings=None # todo, subtask.specifications_doc, probably more complex than it looks -> RGOE yes complex type for later -> JK: assuming this is done in TMSS-308? + subarraypointings = None # todo, subtask.specifications_doc, probably more complex than it looks -> RGOE yes complex type for later -> JK: assuming this is done in TMSS-308? + concatenated_task_descriptions = "\n".join([tb.description for tb in subtask.task_blueprints.order_by("specifications_template__name").all()]) # we could also order by "specifications_template__type__value"? observation = siplib.Observation(observingmode=constants.OBSERVINGMODETYPE_BEAM_OBSERVATION, # can be hardcoded for an observation instrumentfilter=mapping_filterset_type_TMSS_2_SIP[subtask.specifications_doc['stations']['filter']], clock_frequency="200", # fixed, @@ -161,7 +162,7 @@ def create_sip_representation_for_subtask(subtask: Subtask): process_map=process_map, channelwidth_frequency=None, # NA any more ('BlueGene compatibility' see comment in LTA-SIP.xsd) channelwidth_frequencyunit=constants.FREQUENCYUNIT_HZ, # fixed - observationdescription=subtask.task_blueprints.first().description, # todo: first? + observationdescription=concatenated_task_descriptions, channelspersubband=0, # NA any more ('BlueGene compatibility' see comment in LTA-SIP.xsd) subarraypointings=subarraypointings, transientbufferboardevents=None # fixed @@ -174,9 +175,11 @@ def create_sip_representation_for_subtask(subtask: Subtask): sourcedata_identifiers += [get_siplib_identifier(dp.global_identifier, "Dataproduct id=%s" % dp.id) for dp in input.dataproducts.all()] # todo: use correct id, lookup based on TMSS reference or so, tbd if not sourcedata_identifiers: raise TMSSException("There seems to be no subtask input associated to your pipeline subtask id %s. Please define what data the pipeline processed." % subtask.id) + if subtask.task_blueprints.count() > 1: + raise TMSSException("There are several task blueprints pk=%s associated to subtask pk=%s, but for pipelines, only a single task is supported." % ([tb.pk for tb in subtask.task_blueprints.all()], subtask.pk)) pipeline_map = siplib.PipelineMap( - name=subtask.task_blueprints.first().name, # todo: first? + name=subtask.task_blueprints.first().name, # there is only one version='unknown', # todo from subtask.specifications_doc? from feedback (we have feedback and storagewriter versions there, not pipeline version or sth)? sourcedata_identifiers=sourcedata_identifiers, process_map=process_map) @@ -187,8 +190,8 @@ def create_sip_representation_for_subtask(subtask: Subtask): numberofcorrelateddataproducts=get_number_of_dataproducts_of_type(subtask, Dataformat.Choices.MEASUREMENTSET.value), frequencyintegrationstep=subtask.specifications_doc.get('demixer',{}).get('frequency_steps', 0), timeintegrationstep=subtask.specifications_doc.get('demixer',{}).get('time_step', 0), - flagautocorrelations=subtask.task_blueprints.first().specifications_doc["flag"]["autocorrelations"], # todo: first? - demixing=True if 'demix' in subtask.task_blueprints.first().specifications_doc else False # todo: first? + flagautocorrelations=True if subtask.specifications_doc.get('preflagger1').get('corrtype','')=="auto" else False, + demixing=True if 'demixer' in subtask.specifications_doc else False ) # todo: distinguish and create other pipeline types. Probably most of these can be filled in over time as needed, # but they are not required for UC1. Here are stubs to start from for the other types the LTA supports: @@ -278,7 +281,7 @@ def create_sip_representation_for_dataproduct(dataproduct: Dataproduct): logger.warning("Could not determine the type of dataproduct id %s (%s). Falling back to %s" % (dataproduct.id, err, dataproduct_type)) try: - dataproduct_fileformat = fileformat_map[dataproduct.producer.subtask.task_blueprints.first().consumed_by.first().dataformat.value] # todo same as with type? Why is this not with the data? Why is this so different from the LTA datamodel? # todo: first? + dataproduct_fileformat = fileformat_map[dataproduct.dataformat.value] # todo same as with type? Why is this not with the data? Why is this so different from the LTA datamodel? except Exception as err: dataproduct_fileformat = constants.FILEFORMATTYPE_UNDOCUMENTED logger.warning("Could not determine the type of dataproduct id %s (%s). Falling back to %s" % (dataproduct.id, err, dataproduct_fileformat)) @@ -463,7 +466,11 @@ def generate_sip_for_dataproduct(dataproduct: Dataproduct) -> siplib.Sip: sip_dataproduct = create_sip_representation_for_dataproduct(dataproduct) # Gather project details - project = dataproduct.producer.subtask.task_blueprints.first().scheduling_unit_blueprint.draft.scheduling_set.project # todo: first? + project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in dataproduct.producer.subtask.task_blueprints]) + if len(project_set) != 1: + # todo: support for multiple projects needs to be picked up in TMSS-689 + raise TMSSException('Dataproduct pk=%s references task blueprints that belong to different projects (names=%s). This can currently not be represented in SIP format.' % (dataproduct.pk, project_set)) + project = dataproduct.producer.subtask.task_blueprints.first().scheduling_unit_blueprint.draft.scheduling_set.project # there must be only one task blueprint project_code = project.name project_primaryinvestigator = 'project_primaryinvestigator' project_contactauthor = 'project_contactauthor' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index 9451ac7618f5e866bfb8388aa7d665ce0596195b..df86f35dc6778e95077ad5661e63108171a84c8d 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -174,11 +174,13 @@ class Subtask(BasicCommon): '''get the specified (or estimated) duration of this subtask based on the specified task duration and the subtask type''' if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: # observations have a specified duration, so grab it from the spec. - return timedelta(seconds=self.task_blueprints.first().specifications_doc.get('duration', 0)) # todo: first? + # In case we have several associated tasks: use the longest duration, since we assume that tasks will run in parallel (there would be no reason to combine them into a subtask). + return timedelta(seconds=max([tb.specifications_doc.get('duration', 0) for tb in self.task_blueprints.all()])) if self.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: # pipelines usually do not have a specified duration, so make a guess (half the obs duration?). - return timedelta(seconds=self.task_blueprints.first().specifications_doc.get('duration', max(p.specified_duration.total_seconds() for p in self.predecessors)/2)) # todo: first? + # In case we have several associated tasks: this guess is probably in no way accurate anyway, so we assume it does not really matter which task blueprint we refer to here + return timedelta(seconds=self.task_blueprints.first().specifications_doc.get('duration', max(p.specified_duration.total_seconds() for p in self.predecessors)/2)) # other subtasktypes usually depend on cpu/data/network etc. So, make a guess (for now) return timedelta(minutes=5) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index b759c70ec9eef357cff2a63b6c870eedaa2e38cc..ab2ddaaa635b26993a6742adbc2a51a9ae91d577 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -501,12 +501,15 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) # step 2: create and link subtask input/output selection_template = TaskRelationSelectionTemplate.objects.get(name="all") selection_doc = get_default_json_object_for_schema(selection_template.schema) - qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask, - producer=observation_subtask.outputs.first(), # TODO: determine proper producer based on spec in task_relation_blueprint - selection_doc=selection_doc, - selection_template=selection_template) - qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, - task_blueprint=observation_subtask.task_blueprints.first()) # todo: first? + + for tb in observation_subtask.task_blueprints.all(): + qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask, + producer=observation_subtask.outputs.first(), # TODO: determine proper producer based on spec in task_relation_blueprint + selection_doc=selection_doc, + selection_template=selection_template) + + qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, + task_blueprint=tb) # step 3: set state to DEFINED qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 0c58b2c36935eb7571c63bb78a14118c37649ab1..9d2e69bf5d6a5878dccf03ea311a776ce2e716ed 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -321,20 +321,20 @@ class SchedulingTest(unittest.TestCase): ingest_subtask_data = test_data_creator.Subtask(specifications_template_url=ingest_subtask_template['url'], specifications_doc=ingest_spec, - task_blueprint_urls=obs_subtask['task_blueprints'], # todo [0]? + task_blueprint_urls=obs_subtask['task_blueprints'], cluster_url=cluster_url) ingest_subtask = test_data_creator.post_data_and_get_response_as_json_object(ingest_subtask_data, '/subtask/') # ...and connect it to the observation test_data_creator.post_data_and_get_url(test_data_creator.SubtaskInput(subtask_url=ingest_subtask['url'], subtask_output_url=obs_subtask_output_url), '/subtask_input/') test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=ingest_subtask['url'], - task_blueprint_url=obs_subtask['task_blueprints'][0]), '/subtask_output/') # todo: correct to link this to the obs? + task_blueprint_url=obs_subtask['task_blueprints'][0]), '/subtask_output/') # our subtask here has only one known related task for predecessor in client.get_subtask_predecessors(ingest_subtask['id']): client.set_subtask_status(predecessor['id'], 'finished') client.set_subtask_status(ingest_subtask['id'], 'defined') - task_blueprint = client.get_url_as_json_object(ingest_subtask['task_blueprints'][0]) # todo: [0]? + task_blueprint = client.get_url_as_json_object(ingest_subtask['task_blueprints'][0]) # our subtask here has only one known related task schedulingunit_blueprint = client.get_url_as_json_object(task_blueprint['scheduling_unit_blueprint']) # first, make sure we need but do not have ingest persmission... diff --git a/SAS/TMSS/backend/test/test_utils.py b/SAS/TMSS/backend/test/test_utils.py index 1b139afd70fd385255dbf0054a35ed5d4511499f..cbc1e8d9119ecb835d790764f54c178166a7c6ee 100644 --- a/SAS/TMSS/backend/test/test_utils.py +++ b/SAS/TMSS/backend/test/test_utils.py @@ -699,7 +699,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int self.create_output_dataproducts = create_output_dataproducts def need_to_handle(self, subtask: models.Subtask) -> bool: - if subtask.task_blueprints.first().scheduling_unit_blueprint.id != self.scheduling_unit_blueprint_id: # todo: first? + if self.scheduling_unit_blueprint_id in [tb.scheduling_unit_blueprint.id for tb in subtask.task_blueprints.all()]: return False if subtask.specifications_template.type.value == models.SubtaskType.Choices.OBSERVATION.value and not self.handle_observations: @@ -828,7 +828,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int output_dp.feedback_doc = feedback_doc output_dp.save() elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value: - project_name = subtask.task_blueprints.first().draft.scheduling_unit_draft.scheduling_set.project.name # todo: first? + project_name = subtask.task_blueprints.first().draft.scheduling_unit_draft.scheduling_set.project.name # todo: support for multiple projects needs to be picked up in TMSS-689 for output_dp in subtask.output_dataproducts: try: