diff --git a/LCS/Messaging/python/messaging/config.py b/LCS/Messaging/python/messaging/config.py index 417a0ea4c5063a932be2535b7c2eac6e405177a0..9cf36e6dc0d62a8fb3cc7de6550bbf91c0126f3b 100644 --- a/LCS/Messaging/python/messaging/config.py +++ b/LCS/Messaging/python/messaging/config.py @@ -1,6 +1,6 @@ import os import logging -logger = logging.getLogger() +logger = logging.getLogger(__name__) import kombu # make default kombu/amqp logger less spammy @@ -39,7 +39,11 @@ DEFAULT_PORT = -1 def broker_url(hostname: str=DEFAULT_BROKER, port: int=DEFAULT_PORT, userid: str=DEFAULT_USER, password :str=DEFAULT_PASSWORD) -> str: return 'amqp://%s:%s@%s:%d//' % (userid, password, hostname, port) -for port in [5672, 5675]: +possible_ports = [5672, 5675] +if isTestEnvironment(): + possible_ports = [5675, 5672] # scu199 has a weird setup, try 5675 first! + +for port in possible_ports: try: logger.debug("trying to connect to broker: hostname=%s port=%s userid=%s password=***", DEFAULT_BROKER, port, DEFAULT_USER) diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index 48d9ca29ff7bd86466326ddeb51c47ec36c9f4e3..66e566e89e7744d01eadbec504a9644629742359 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -17,6 +17,8 @@ from django.dispatch import receiver from django.db.models.expressions import RawSQL from lofar.sas.tmss.tmss.tmssapp.validation import validate_json_against_schema +from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException +from lofar.common.datetimeutils import formatDatetime from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging.messages import EventMessage from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX @@ -201,6 +203,11 @@ class Subtask(BasicCommon): if self.specifications_doc and self.specifications_template_id and self.specifications_template.schema: validate_json_against_schema(self.specifications_doc, self.specifications_template.schema) + if self.state.value == SubtaskState.Choices.SCHEDULED.value and self.__original_state.value == SubtaskState.Choices.SCHEDULING.value: + if self.start_time is None or self.stop_time is None: + raise SubtaskSchedulingException("Cannot schedule subtask id=%s when start/stop times are NULL. start_time='%s' stoptime='%s'" % + (self.pk, formatDatetime(self.start_time), formatDatetime(self.stop_time))) + super().save(force_insert, force_update, using, update_fields) # log if either state update or new entry: diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py index 18239cbed34c20f26544db05a90af01ac37246ba..f1859f24fe73a5a3f8925e4818fb1b64a73f1028 100644 --- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -1,6 +1,7 @@ import logging logger = logging.getLogger(__name__) +from lofar.common.datetimeutils import formatDatetime from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException @@ -270,16 +271,31 @@ def schedule_subtask(subtask: Subtask) -> Subtask: '''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.''' check_prerequities_for_scheduling(subtask) - if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value: - return schedule_qafile_subtask(subtask) + try: + if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: + return schedule_pipeline_subtask(subtask) - if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value: - return schedule_qaplots_subtask(subtask) + if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + return schedule_observation_subtask(subtask) - if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: - return schedule_pipeline_subtask(subtask) + if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value: + return schedule_qafile_subtask(subtask) + + if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value: + return schedule_qaplots_subtask(subtask) + + raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." % (subtask.pk, subtask.specifications_template.type.value)) + except Exception as e: + try: + # set the subtask to state 'ERROR'... + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value) + subtask.save() + except Exception as e2: + logger.error(e2) + finally: + # ... and re-raise the original exception + raise - raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." % (subtask.pk, subtask.specifications_template.type.value)) def check_prerequities_for_scheduling(subtask: Subtask) -> bool: if subtask.state.value != SubtaskState.Choices.DEFINED.value: @@ -382,6 +398,65 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask): return qaplots_subtask +def schedule_observation_subtask(observation_subtask: Subtask): + ''' Schedule the given observation_subtask + For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler. + For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event. + This method implements "Scheduling subtasks" step from the "Specification Flow" + https://support.astron.nl/confluence/display/TMSS/Specification+Flow + ''' + # step 0: check pre-requisites + check_prerequities_for_scheduling(observation_subtask) + + if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: + raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk, + observation_subtask.specifications_template.type, + SubtaskType.Choices.OBSERVATION.value)) + + # step 1: set state to SCHEDULING + observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) + observation_subtask.save() + + # step 1a: check start/stop times + if observation_subtask.start_time is None: + now = datetime.utcnow() + next_start_time = now + timedelta(minutes=+2, seconds=-now.second, microseconds=-now.microsecond) + logger.info("observation id=%s has no starttime. assigned default: %s", observation_subtask.pk, formatDatetime(next_start_time)) + observation_subtask.start_time = next_start_time + + if observation_subtask.stop_time is None: + stop_time = observation_subtask.start_time + timedelta(minutes=+2) + logger.info("observation id=%s has no stop_time. assigned default: %s", observation_subtask.pk, formatDatetime(stop_time)) + observation_subtask.stop_time = stop_time + + # step 2: define input dataproducts + # TODO: are there any observations that take input dataproducts? + + # step 3: resource assigner + # TODO: implement. Can be skipped for now. + + # step 4: create output dataproducts, and link these to the output + specifications_doc = observation_subtask.specifications_doc + dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="Empty") + dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="Empty") + subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first() + for sb_nr in specifications_doc['stations']['digital_pointings'][0]['subbands']: + Dataproduct.objects.create(filename="L%d_SB%03d_uv.MS" % (observation_subtask.id, sb_nr), + directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (observation_subtask.id,), + dataformat=Dataformat.objects.get(value="MeasurementSet"), + producer=subtask_output, + specifications_doc={}, + specifications_template=dataproduct_specifications_template, + feedback_doc="", + feedback_template=dataproduct_feedback_template) + + # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) + observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) + observation_subtask.save() + + return observation_subtask + + def schedule_pipeline_subtask(pipeline_subtask: Subtask): ''' Schedule the given pipeline_subtask This method should typically be called upon the event of an predecessor (observation) subtask being finished.