From 18c41fc696392fa0ec9fc1abdeabe51332d8b9c5 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 24 Oct 2023 11:29:21 +0200 Subject: [PATCH] optimized query to determine if task/subtask is an observation. May help for SDCH-4095 --- .../src/tmss/tmssapp/adapters/feedback.py | 6 +++--- .../backend/src/tmss/tmssapp/adapters/sip.py | 4 ++-- .../src/tmss/tmssapp/models/scheduling.py | 2 +- .../src/tmss/tmssapp/models/specification.py | 8 ++++---- SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py | 20 +++++++++---------- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py index 209a2b19a15..7f84885d425 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py @@ -336,7 +336,7 @@ def process_feedback_into_subtask_properties(subtask:Subtask, feedback: paramete raise SubtaskInvalidStateException("Cannot process feedback for subtask id=%s because the state is '%s' and not '%s'" % (subtask.id, subtask.state.value, SubtaskState.Choices.FINISHING.value)) try: - if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + if subtask.is_observation: # update actual start/stop times from the feedback doc, template = observation_process_feedback_to_feedback_doc_and_template(feedback) @@ -393,7 +393,7 @@ def process_feedback_into_subtask_dataproducts(subtask:Subtask, feedback: parame logger.info('processing feedback for dataproduct id=%s filename=%s of subtask id=%s feedback: %s', dataproduct.id, dataproduct.filename, subtask.id, single_line_with_single_spaces(str(dp_feedback))) # derive values or collect for different subtask types - if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + if subtask.is_observation: if dataproduct.datatype.value == "visibilities": dataproduct.feedback_template = DataproductFeedbackTemplate.get_version_or_latest(name='feedback') dataproduct.feedback_doc = observation_correlated_feedback_to_feedback_doc(dp_feedback) @@ -471,7 +471,7 @@ def process_feedback_for_subtask_and_set_to_finished_if_complete(subtask: Subtas def create_feedback_for_observation_subtask_from_specifications(subtask: Subtask) -> parameterset: '''creates a feedback document in parset format from the observation subtask's specification''' - if subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: + if not subtask.is_observation: raise SubtaskException("Cannot create observation feedback for subtask id=%s of type=%s" % (subtask.id, subtask.specifications_template.type.value)) # must be scheduled or beyond diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py index 75ae8d3c9a8..3fe63c7cdbb 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py @@ -191,7 +191,7 @@ def create_sip_representation_for_subtask(subtask: Subtask, parset_sip_identifie 'parset_identifier': parset_sip_identifier} # determine subtask specific properties and add subtask representation to Sip object - if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + if subtask.is_observation: # use on_sky_start_time, on_sky_duration for observations process_map_args['starttime'] = subtask.on_sky_start_time process_map_args['duration'] = isodate.duration_isoformat(datetime.timedelta(seconds=round(subtask.on_sky_duration.total_seconds()))) @@ -473,7 +473,7 @@ def create_sip_representation_for_dataproduct(dataproduct: Dataproduct): if "pulp" in dataproduct.dataformat.value: storage_writer = constants.STORAGEWRITERTYPE_UNKNOWN - elif 'samples' in dataproduct.feedback_doc and dataproduct.feedback_doc["samples"]["writer"] == "standard" and dataproduct.dataformat.value == Dataformat.Choices.BEAMFORMED.value and dataproduct.producer.subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + elif 'samples' in dataproduct.feedback_doc and dataproduct.feedback_doc["samples"]["writer"] == "standard" and dataproduct.dataformat.value == Dataformat.Choices.BEAMFORMED.value and dataproduct.producer.subtask.is_observation: storage_writer = constants.STORAGEWRITERTYPE_HDF5DEFAULT else: storage_writer = storage_writer_map[dataproduct.feedback_doc["samples"]["writer"] if 'samples' in dataproduct.feedback_doc else 'unknown'] # todo: It works for the preprocessing pipelines. Verify also for other types that we can use the feedback_doc here and remove the old method | storage_writer_map[dataproduct.producer.subtask.task_blueprint.specifications_doc.get("storagemanager", 'unknown')], diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index e5fcd0b6072..c55171cd29a 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -225,7 +225,7 @@ class Subtask(RefreshFromDbInvalidatesCachedPropertiesMixin, ProjectPropertyMixi @cached_property def is_observation(self) -> bool: '''is this subtask an observation?''' - return self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value + return Subtask.objects.filter(id=self.id).filter(specifications_template__type__value=SubtaskType.Choices.OBSERVATION.value).exists() @cached_property def stations(self) -> []: diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index a9104d665c4..c38a5734e0c 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -700,7 +700,7 @@ class TaskTemplate(AbstractSchemaTemplate): # ToDo: create a technical debt ticket and implement the factory. # rules for observations - if self.type.value == TaskType.Choices.OBSERVATION.value: + if self.is_observation: # check station_groups station_groups = json_doc.get('station_configuration',{}).get('station_groups',[]) for group in station_groups: @@ -1669,17 +1669,17 @@ class TaskCommonPropertiesMixin: @cached_property def is_observation(self) -> bool: '''is this task an observation?''' - return self.specifications_template.type.value == TaskType.Choices.OBSERVATION.value + return self.__class__.objects.filter(id=self.id).filter(specifications_template__type__value=TaskType.Choices.OBSERVATION.value).exists() @cached_property def is_target_observation(self) -> bool: '''is this task a target observation? (also true for combined calibrator/target observations)''' - return self.is_observation and self.specifications_doc is not None and ('correlator' in self.specifications_doc or 'beamformer' in self.specifications_doc) + return self.is_observation and self.__class__.objects.filter(id=self.id).filter(Q(specifications_doc__has_key='correlator') | Q(specifications_doc__has_key='beamformer')).exists() @cached_property def is_calibrator_observation(self) -> bool: '''is this task a calibrator observation? (also true for combined calibrator/target observations)''' - return self.is_observation and self.specifications_doc is not None and 'calibrator' in self.specifications_doc + return self.is_observation and self.__class__.objects.filter(id=self.id).filter(specifications_doc__has_key='calibrator').exists() @cached_property def specified_station_groups(self) -> list: diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 3ecc9b1f688..6fc2f909c29 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -686,7 +686,7 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB def create_qafile_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: - observation_subtasks = [st for st in task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value] + observation_subtasks = [st for st in task_blueprint.subtasks.order_by('id').all() if st.is_observation] if not observation_subtasks: raise SubtaskCreationException("Cannot create %s subtask for task_blueprint id=%d because it has no observation subtask(s)" % ( SubtaskType.Choices.QA_FILES.value, task_blueprint.pk)) @@ -703,7 +703,7 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) # step 0: check pre-requisites check_prerequities_for_subtask_creation(observation_subtask.task_blueprint) - if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: + if not observation_subtask.is_observation: raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % ( SubtaskType.Choices.QA_FILES.value, observation_subtask.pk, observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value)) @@ -865,7 +865,7 @@ def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, s # TODO: go more elegant lookup of predecessor observation task # TODO: do not require the input to come from an observation observation_predecessor_tasks = [t for t in task_blueprint.predecessors.all() if any(st for st in t.subtasks.all() - if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)] + if st.is_observation)] if not observation_predecessor_tasks: raise SubtaskCreationException("Cannot create a subtask for task_blueprint id=%s because it is not connected " "to an observation predecessor (sub)task." % task_blueprint.pk) @@ -1018,7 +1018,7 @@ def schedule_subtask(subtask: Subtask, start_time: datetime=None, misc_unavailab if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: return schedule_pipeline_subtask(subtask) - if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + if subtask.is_observation: return schedule_observation_subtask(subtask, misc_unavailable_stations=misc_unavailable_stations) if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value: @@ -1137,7 +1137,7 @@ def update_start_time_and_shift_successors_until_after_stop_time(subtask: Subtas subtask.scheduled_start_time = start_time subtask.scheduled_stop_time = (subtask.scheduled_start_time + subtask.specified_duration) if start_time else None - if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + if subtask.is_observation: compute_scheduled_central_lst(subtask) subtask.save() @@ -1460,7 +1460,7 @@ def schedule_observation_subtask(observation_subtask: Subtask, misc_unavailable_ # step 0: check pre-requisites check_prerequities_for_scheduling(observation_subtask) - if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: + if not observation_subtask.is_observation: raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk, observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value)) @@ -1680,7 +1680,7 @@ def schedule_observation_subtask(observation_subtask: Subtask, misc_unavailable_ def compute_scheduled_central_lst(subtask: Subtask) -> Subtask: '''compute and set the scheduled central LST''' - if subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: + if not subtask.is_observation: raise ValueError("Cannot compute scheduled_central_lst for subtask id=%d type=%s but type should be %s" % (subtask.pk, subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value)) @@ -1779,7 +1779,7 @@ def enough_stations_available_for_subtask(subtask: Subtask, remove_reserved_stat def get_missing_stations(subtask: Subtask, lower_bound: datetime=None, upper_bound: datetime=None) -> []: '''Return the list of stations that were requested in the task specification, but which were not available to the scheduled observation.''' # this only makes sense for observations - if subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: + if not subtask.is_observation: return [] # fetch the requested stations from the spec (without removing the unavailable ones! and not raising!) @@ -2484,7 +2484,7 @@ def cancel_subtask(subtask: Subtask) -> Subtask: if needs_to_kill_subtask: # kill the queued/started subtask, depending on type - if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + if subtask.is_observation: kill_observation_subtask(subtask) elif subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: kill_pipeline_subtask(subtask) @@ -2530,7 +2530,7 @@ def cancel_subtask_successors(subtask: Subtask): def kill_observation_subtask(subtask: Subtask) -> bool: '''Kill the observation subtask. Return True if actually killed.''' - if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + if subtask.is_observation: with ObservationControlRPCClient.create() as obs_control_client: return obs_control_client.abort_observation(subtask.id)['aborted'] return False -- GitLab