diff --git a/SAS/TMSS/backend/services/lobster/lib/message_handler.py b/SAS/TMSS/backend/services/lobster/lib/message_handler.py index 06c1adc9b6886cfb52edbb0390184c58be048f1e..4bdb2eb2acc27d1510f770304fbc5ad8d9fb6886 100644 --- a/SAS/TMSS/backend/services/lobster/lib/message_handler.py +++ b/SAS/TMSS/backend/services/lobster/lib/message_handler.py @@ -32,6 +32,8 @@ from copy import copy from time import sleep from datetime import datetime, timedelta from dateutil import parser +from os import system +from tempfile import TemporaryDirectory import logging logger = logging.getLogger(__name__) @@ -97,6 +99,59 @@ class L2TMSSObservationControlMessageHandler(TMSSEventMessageHandler): for subtask in scheduled_observation_subtasks: self.enqueue_scheduled_observation_subtask(subtask) + def _enqueue_observation_on_stations(self, subtask: dict): + l2stationspecs = self.tmss_client.get_subtask_l2stationspecs(subtask_id, retry_count=5) + + # combine stations with same observation_id to create singular + # multistationobservation instances in observation_pool + observations = combine_l2specification_multistation(l2stationspecs) + + # add each observation to the pool and test connectivity + for observation_id, value in observations.items(): + logger.info( + "subtask id=%s has observation=%s for stations=%s with " + "spec=%s", subtask_id, observation_id, value['stations'], + value['specification'] + ) + station_obs = self.observation_pool.create_multistationobservation( + subtask_id, observation_id, value['stations'], value['specification'] + ) + + if not station_obs.all_connected: + logger.warning( + "failed to connect to some of stations=%s during" + "preparation!", value['stations'] + ) + + def _enqueue_observation_on_COBALT(self, subtask: dict): + subtask_id = subtask['id'] + + l2parset = self.tmss_client.get_subtask_parset(subtask_id, retry_count=5) + + # COBALT expects the parset to be on the disk of its head node, and + # then for a script to kickstart the observation process. COBALT + # will then spawn its own processes to prepare and start the observation. + + COBALT_HEADNODE = "cbm299.control.lofar" + COBALT_PARSET_DIR = "/opt/lofar/var/run" + COBALT_STARTBGL_SCRIPT = "/opt/lofar/bin/startBGL.sh" + + # This naming is consistent with earlier systems (OnlineControl) + PARSET_FILENAME= f"CorrProc_{subtask_id}.param" + + with TemporaryDirectory(prefix=f"tmp-cobalt-{subtask_id}-") as tmpdir: + # write parset to disk so we can scp it + with open(f"{tmpdir}/{PARSET_FILENAME}", "w") as parset: + parset.write(l2parset) + + # copy it to COBALT + system(f"scp {tmpdir}/{PARSET_FILENAME} {COBALT_HEADNODE}:${COBALT_PARSET_DIR}/{PARSET_FILENAME}") + + # kickstart the observation on COBALT + # first 3 parameters are historical and ignored + system(f"ssh {COBALT_HEADNODE} {COBALT_STARTBGL_SCRIPT} 1 2 3 {COBALT_PARSET_DIR}/{PARSET_FILENAME} {subtask_id}") + + def enqueue_scheduled_observation_subtask(self, subtask: dict): subtask_id = subtask['id'] @@ -125,28 +180,9 @@ class L2TMSSObservationControlMessageHandler(TMSSEventMessageHandler): self.tmss_client.set_subtask_status(subtask_id, "queueing", retry_count=5) - l2stationspecs = self.tmss_client.get_subtask_l2stationspecs(subtask_id, retry_count=5) + self._enqueue_observation_on_stations(subtask_id) + self._enqueue_observation_on_COBALT(subtask_id) - # Combine stations with same observation_id to create singular - # MultiStationObservation instances in observation_pool - observations = combine_l2specification_multistation(l2stationspecs) - - # Add each observation to the pool and test connectivity - for observation_id, value in observations.items(): - logger.info( - "subtask id=%s has observation=%s for stations=%s with " - "spec=%s", subtask_id, observation_id, value['stations'], - value['specification'] - ) - station_obs = self.observation_pool.create_multistationobservation( - subtask_id, observation_id, value['stations'], value['specification'] - ) - - if not station_obs.all_connected: - logger.warning( - "Failed to connect to some of stations=%s during" - "preparation!", value['stations'] - ) # ToDo: the Jira ticket says we need to check the specifications_doc for # validity. That's superfluous cause TMSS already does that, but it