diff --git a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py index c67c38b455494755cff76d923dba6988ab019c9a..a98f02f38a4f79c655ae22f68150a5d477fe14b6 100644 --- a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py +++ b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py @@ -13,6 +13,7 @@ from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, UsingToBusMixin from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.common import dbcredentials logger = logging.getLogger(__name__) @@ -29,21 +30,38 @@ class OTDBtoRATaskStatusPropagator(UsingToBusMixin, OTDBEventMessageHandler): super().init_tobus(exchange, broker) self.otdb = OTDBRPC.create(exchange=exchange, broker=broker) self.radb = RADBRPC.create(exchange=exchange, broker=broker) + self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker) def start_handling(self): self.otdb.open() self.radb.open() + self.momrpc.open() super().start_handling() def stop_handling(self): self.otdb.close() self.radb.close() + self.momrpc.close() super().stop_handling() def _update_radb_task_status(self, otdb_id, task_status): try: + # get current task (with current status) + radb_task = self.radb.getTask(otdb_id=otdb_id) + + if task_status == 'aborted': + if radb_task['status'] == 'completing': + # fix for issue: https://support.astron.nl/jira/browse/ROHD-2538 + mom_task_details = self.momrpc.getObjectDetails(radb_task['mom_id'])[radb_task['mom_id']] + if mom_task_details['project_name'].lower() == 'lc14_002': + logger.info("overriding incorrect 'aborted' status to 'finished' for otdb_id=%s of project %s via OTDB", otdb_id, mom_task_details['project_name']) + # override the faulty aborted status by setting the status in otdb, which then reports via the messagebus to mom/RA/etc, so all systems are again in sync. + self.otdb.taskSetStatus(otdb_id=otdb_id, new_status="finished") + + # otdb adjusts stoptime when aborted, + self._updateStopTime(otdb_id) + if task_status in ['approved', 'prescheduled', 'obsolete']: - radb_task = self.radb.getTask(otdb_id=otdb_id) if (radb_task and radb_task['status'] in ['queued', 'active', 'completing']): # set task to aborted first, so other controls (e.g. pipelinecontrol) # can respond to the aborted event @@ -254,9 +272,6 @@ class OTDBtoRATaskStatusPropagator(UsingToBusMixin, OTDBEventMessageHandler): def onObservationAborted(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'aborted') - # otdb adjusts stoptime when aborted, - self._updateStopTime(treeId) - def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)