Skip to content
Snippets Groups Projects
Commit 1de30048 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-207: scu199 has a weird setup, try 5675 first!

parent 8869f14d
No related branches found
No related tags found
1 merge request!162Intermediate merge of TMSS-207 to master
import os import os
import logging import logging
logger = logging.getLogger() logger = logging.getLogger(__name__)
import kombu import kombu
# make default kombu/amqp logger less spammy # make default kombu/amqp logger less spammy
...@@ -39,7 +39,11 @@ DEFAULT_PORT = -1 ...@@ -39,7 +39,11 @@ DEFAULT_PORT = -1
def broker_url(hostname: str=DEFAULT_BROKER, port: int=DEFAULT_PORT, userid: str=DEFAULT_USER, password :str=DEFAULT_PASSWORD) -> str: def broker_url(hostname: str=DEFAULT_BROKER, port: int=DEFAULT_PORT, userid: str=DEFAULT_USER, password :str=DEFAULT_PASSWORD) -> str:
return 'amqp://%s:%s@%s:%d//' % (userid, password, hostname, port) return 'amqp://%s:%s@%s:%d//' % (userid, password, hostname, port)
for port in [5672, 5675]: possible_ports = [5672, 5675]
if isTestEnvironment():
possible_ports = [5675, 5672] # scu199 has a weird setup, try 5675 first!
for port in possible_ports:
try: try:
logger.debug("trying to connect to broker: hostname=%s port=%s userid=%s password=***", logger.debug("trying to connect to broker: hostname=%s port=%s userid=%s password=***",
DEFAULT_BROKER, port, DEFAULT_USER) DEFAULT_BROKER, port, DEFAULT_USER)
......
...@@ -17,6 +17,8 @@ from django.dispatch import receiver ...@@ -17,6 +17,8 @@ from django.dispatch import receiver
from django.db.models.expressions import RawSQL from django.db.models.expressions import RawSQL
from lofar.sas.tmss.tmss.tmssapp.validation import validate_json_against_schema from lofar.sas.tmss.tmss.tmssapp.validation import validate_json_against_schema
from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException
from lofar.common.datetimeutils import formatDatetime
from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.messages import EventMessage from lofar.messaging.messages import EventMessage
from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX
...@@ -201,6 +203,11 @@ class Subtask(BasicCommon): ...@@ -201,6 +203,11 @@ class Subtask(BasicCommon):
if self.specifications_doc and self.specifications_template_id and self.specifications_template.schema: if self.specifications_doc and self.specifications_template_id and self.specifications_template.schema:
validate_json_against_schema(self.specifications_doc, self.specifications_template.schema) validate_json_against_schema(self.specifications_doc, self.specifications_template.schema)
if self.state.value == SubtaskState.Choices.SCHEDULED.value and self.__original_state.value == SubtaskState.Choices.SCHEDULING.value:
if self.start_time is None or self.stop_time is None:
raise SubtaskSchedulingException("Cannot schedule subtask id=%s when start/stop times are NULL. start_time='%s' stoptime='%s'" %
(self.pk, formatDatetime(self.start_time), formatDatetime(self.stop_time)))
super().save(force_insert, force_update, using, update_fields) super().save(force_insert, force_update, using, update_fields)
# log if either state update or new entry: # log if either state update or new entry:
......
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from lofar.common.datetimeutils import formatDatetime
from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema
from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException
...@@ -270,16 +271,31 @@ def schedule_subtask(subtask: Subtask) -> Subtask: ...@@ -270,16 +271,31 @@ def schedule_subtask(subtask: Subtask) -> Subtask:
'''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.''' '''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.'''
check_prerequities_for_scheduling(subtask) check_prerequities_for_scheduling(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value: try:
return schedule_qafile_subtask(subtask) if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
return schedule_pipeline_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value: if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
return schedule_qaplots_subtask(subtask) return schedule_observation_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value:
return schedule_pipeline_subtask(subtask) return schedule_qafile_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value:
return schedule_qaplots_subtask(subtask)
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." % (subtask.pk, subtask.specifications_template.type.value))
except Exception as e:
try:
# set the subtask to state 'ERROR'...
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
subtask.save()
except Exception as e2:
logger.error(e2)
finally:
# ... and re-raise the original exception
raise
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." % (subtask.pk, subtask.specifications_template.type.value))
def check_prerequities_for_scheduling(subtask: Subtask) -> bool: def check_prerequities_for_scheduling(subtask: Subtask) -> bool:
if subtask.state.value != SubtaskState.Choices.DEFINED.value: if subtask.state.value != SubtaskState.Choices.DEFINED.value:
...@@ -382,6 +398,65 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask): ...@@ -382,6 +398,65 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask):
return qaplots_subtask return qaplots_subtask
def schedule_observation_subtask(observation_subtask: Subtask):
''' Schedule the given observation_subtask
For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler.
For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(observation_subtask)
if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk,
observation_subtask.specifications_template.type,
SubtaskType.Choices.OBSERVATION.value))
# step 1: set state to SCHEDULING
observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
observation_subtask.save()
# step 1a: check start/stop times
if observation_subtask.start_time is None:
now = datetime.utcnow()
next_start_time = now + timedelta(minutes=+2, seconds=-now.second, microseconds=-now.microsecond)
logger.info("observation id=%s has no starttime. assigned default: %s", observation_subtask.pk, formatDatetime(next_start_time))
observation_subtask.start_time = next_start_time
if observation_subtask.stop_time is None:
stop_time = observation_subtask.start_time + timedelta(minutes=+2)
logger.info("observation id=%s has no stop_time. assigned default: %s", observation_subtask.pk, formatDatetime(stop_time))
observation_subtask.stop_time = stop_time
# step 2: define input dataproducts
# TODO: are there any observations that take input dataproducts?
# step 3: resource assigner
# TODO: implement. Can be skipped for now.
# step 4: create output dataproducts, and link these to the output
specifications_doc = observation_subtask.specifications_doc
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="Empty")
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="Empty")
subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first()
for sb_nr in specifications_doc['stations']['digital_pointings'][0]['subbands']:
Dataproduct.objects.create(filename="L%d_SB%03d_uv.MS" % (observation_subtask.id, sb_nr),
directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (observation_subtask.id,),
dataformat=Dataformat.objects.get(value="MeasurementSet"),
producer=subtask_output,
specifications_doc={},
specifications_template=dataproduct_specifications_template,
feedback_doc="",
feedback_template=dataproduct_feedback_template)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
observation_subtask.save()
return observation_subtask
def schedule_pipeline_subtask(pipeline_subtask: Subtask): def schedule_pipeline_subtask(pipeline_subtask: Subtask):
''' Schedule the given pipeline_subtask ''' Schedule the given pipeline_subtask
This method should typically be called upon the event of an predecessor (observation) subtask being finished. This method should typically be called upon the event of an predecessor (observation) subtask being finished.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment