From fe99dbf4d0b715bb02e4272aa7a7d6dda2ae2b20 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Fri, 5 Jun 2020 13:09:56 +0200 Subject: [PATCH] TMSS-207: raise SubtaskSchedulingException when start/stop times are None. add default start/stop times. --- .../src/tmss/tmssapp/models/scheduling.py | 7 ++ SAS/TMSS/src/tmss/tmssapp/subtasks.py | 89 +++++++++++++++++-- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index 48d9ca29ff7..66e566e89e7 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -17,6 +17,8 @@ from django.dispatch import receiver from django.db.models.expressions import RawSQL from lofar.sas.tmss.tmss.tmssapp.validation import validate_json_against_schema +from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException +from lofar.common.datetimeutils import formatDatetime from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging.messages import EventMessage from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX @@ -201,6 +203,11 @@ class Subtask(BasicCommon): if self.specifications_doc and self.specifications_template_id and self.specifications_template.schema: validate_json_against_schema(self.specifications_doc, self.specifications_template.schema) + if self.state.value == SubtaskState.Choices.SCHEDULED.value and self.__original_state.value == SubtaskState.Choices.SCHEDULING.value: + if self.start_time is None or self.stop_time is None: + raise SubtaskSchedulingException("Cannot schedule subtask id=%s when start/stop times are NULL. start_time='%s' stoptime='%s'" % + (self.pk, formatDatetime(self.start_time), formatDatetime(self.stop_time))) + super().save(force_insert, force_update, using, update_fields) # log if either state update or new entry: diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py index 18239cbed34..f1859f24fe7 100644 --- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -1,6 +1,7 @@ import logging logger = logging.getLogger(__name__) +from lofar.common.datetimeutils import formatDatetime from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException @@ -270,16 +271,31 @@ def schedule_subtask(subtask: Subtask) -> Subtask: '''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.''' check_prerequities_for_scheduling(subtask) - if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value: - return schedule_qafile_subtask(subtask) + try: + if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: + return schedule_pipeline_subtask(subtask) - if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value: - return schedule_qaplots_subtask(subtask) + if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + return schedule_observation_subtask(subtask) - if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: - return schedule_pipeline_subtask(subtask) + if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value: + return schedule_qafile_subtask(subtask) + + if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value: + return schedule_qaplots_subtask(subtask) + + raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." % (subtask.pk, subtask.specifications_template.type.value)) + except Exception as e: + try: + # set the subtask to state 'ERROR'... + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value) + subtask.save() + except Exception as e2: + logger.error(e2) + finally: + # ... and re-raise the original exception + raise - raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." % (subtask.pk, subtask.specifications_template.type.value)) def check_prerequities_for_scheduling(subtask: Subtask) -> bool: if subtask.state.value != SubtaskState.Choices.DEFINED.value: @@ -382,6 +398,65 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask): return qaplots_subtask +def schedule_observation_subtask(observation_subtask: Subtask): + ''' Schedule the given observation_subtask + For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler. + For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event. + This method implements "Scheduling subtasks" step from the "Specification Flow" + https://support.astron.nl/confluence/display/TMSS/Specification+Flow + ''' + # step 0: check pre-requisites + check_prerequities_for_scheduling(observation_subtask) + + if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: + raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk, + observation_subtask.specifications_template.type, + SubtaskType.Choices.OBSERVATION.value)) + + # step 1: set state to SCHEDULING + observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) + observation_subtask.save() + + # step 1a: check start/stop times + if observation_subtask.start_time is None: + now = datetime.utcnow() + next_start_time = now + timedelta(minutes=+2, seconds=-now.second, microseconds=-now.microsecond) + logger.info("observation id=%s has no starttime. assigned default: %s", observation_subtask.pk, formatDatetime(next_start_time)) + observation_subtask.start_time = next_start_time + + if observation_subtask.stop_time is None: + stop_time = observation_subtask.start_time + timedelta(minutes=+2) + logger.info("observation id=%s has no stop_time. assigned default: %s", observation_subtask.pk, formatDatetime(stop_time)) + observation_subtask.stop_time = stop_time + + # step 2: define input dataproducts + # TODO: are there any observations that take input dataproducts? + + # step 3: resource assigner + # TODO: implement. Can be skipped for now. + + # step 4: create output dataproducts, and link these to the output + specifications_doc = observation_subtask.specifications_doc + dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="Empty") + dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="Empty") + subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first() + for sb_nr in specifications_doc['stations']['digital_pointings'][0]['subbands']: + Dataproduct.objects.create(filename="L%d_SB%03d_uv.MS" % (observation_subtask.id, sb_nr), + directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (observation_subtask.id,), + dataformat=Dataformat.objects.get(value="MeasurementSet"), + producer=subtask_output, + specifications_doc={}, + specifications_template=dataproduct_specifications_template, + feedback_doc="", + feedback_template=dataproduct_feedback_template) + + # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) + observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) + observation_subtask.save() + + return observation_subtask + + def schedule_pipeline_subtask(pipeline_subtask: Subtask): ''' Schedule the given pipeline_subtask This method should typically be called upon the event of an predecessor (observation) subtask being finished. -- GitLab