Skip to content
Snippets Groups Projects
Commit ae68c88b authored by Auke Klazema's avatar Auke Klazema
Browse files

SW-705: Fix test and RATaskSpecified together with schedulechecker

parent 92830750
No related branches found
No related tags found
No related merge requests found
...@@ -21,15 +21,15 @@ ...@@ -21,15 +21,15 @@
# #
# $Id$ # $Id$
""" """
Daemon that listens to specific OTDB status changes, requests the parset of such jobs including their predecessors, and Daemon that listens to specific OTDB status changes, requests the parset of such jobs including
posts them on the bus. their predecessors, and posts them on the bus.
""" """
from lofar.messaging import ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging import ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.otdb.OTDBBusListener import OTDBEventMessageHandler, OTDBBusListener from lofar.sas.otdb.OTDBBusListener import OTDBEventMessageHandler, OTDBBusListener
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT from lofar.sas.resourceassignment.rataskspecified.config import \
DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT
from lofar.sas.resourceassignment.common.specification import Specification from lofar.sas.resourceassignment.common.specification import Specification
from lofar.messaging import DEFAULT_BUSNAME, DEFAULT_BROKER
from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
...@@ -41,8 +41,8 @@ logger = logging.getLogger(__name__) ...@@ -41,8 +41,8 @@ logger = logging.getLogger(__name__)
class RATaskSpecifiedOTDBEventMessageHandler(OTDBEventMessageHandler): class RATaskSpecifiedOTDBEventMessageHandler(OTDBEventMessageHandler):
def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
""" """
:param radb_exchange: name of the bus on which the radb service listens (default: lofar.ra.command) :param exchange: name of the exchange to listen on.
:param radb_servicename: name of the radb service (default: RADBService) :param broker: name of the broker to connect to.
""" """
super().__init__() super().__init__()
self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker) self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker)
...@@ -50,36 +50,47 @@ class RATaskSpecifiedOTDBEventMessageHandler(OTDBEventMessageHandler): ...@@ -50,36 +50,47 @@ class RATaskSpecifiedOTDBEventMessageHandler(OTDBEventMessageHandler):
self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker) self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker)
self.send_bus = ToBus(exchange=exchange, broker=broker) self.send_bus = ToBus(exchange=exchange, broker=broker)
def start_handling(self):
self.otdbrpc.open()
self.momrpc.open()
self.radbrpc.open()
self.send_bus.open()
def stop_handling(self):
self.otdbrpc.close()
self.momrpc.close()
self.radbrpc.close()
self.send_bus.close()
# This is mainly to trigger the propagation of misc field values through read_from_mom # This is mainly to trigger the propagation of misc field values through read_from_mom
# and then sending them to the RA to OTDB Service in the resource assigner. # and then sending them to the RA to OTDB Service in the resource assigner.
# Might need to be a separate service if we take on more mom-otdb-adapter function. # Might need to be a separate service if we take on more mom-otdb-adapter function.
def onObservationApproved(self, main_id, modificationTime): def onObservationApproved(self, main_id, modification_time):
self.createAndSendSpecifiedTask(main_id, 'approved') self.createAndSendSpecifiedTask(main_id, 'approved')
def onObservationPrescheduled(self, main_id, modificationTime): def onObservationPrescheduled(self, main_id, modification_time):
self.createAndSendSpecifiedTask(main_id, 'prescheduled') self.createAndSendSpecifiedTask(main_id, 'prescheduled')
def createAndSendSpecifiedTask(self, main_id, status): def createAndSendSpecifiedTask(self, main_id, status):
with self.otdbrpc, self.radbrpc, self.momrpc: spec = Specification(self.otdbrpc, self.momrpc, self.radbrpc)
spec = Specification(logger,
exchange=self.send_bus.exchange,
broker=self.send_bus.broker)
spec.status = status spec.status = status
spec.read_from_OTDB_with_predecessors(main_id, "otdb", {}) spec.read_from_OTDB_with_predecessors(main_id, "otdb", {})
spec.read_from_mom() spec.read_from_mom()
spec.update_start_end_times() spec.update_start_end_times()
# spec.insert_into_radb() is still done in resource_assigner for now. # spec.insert_into_radb() is still done in resource_assigner for now.
resultTree = spec.as_dict() result_tree = spec.as_dict()
if spec.status == status: if spec.status == status:
logger.info("Sending result: %s" % resultTree) logger.info("Sending result: %s" % result_tree)
# Put result on bus # Put result on bus
msg = EventMessage(subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, content=resultTree) msg = EventMessage(subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
content=result_tree)
with self.send_bus: with self.send_bus:
self.send_bus.send(msg) self.send_bus.send(msg)
logger.info("Result sent") logger.info("Result sent")
else: else:
logger.warning("Problem retrieving task %i" % main_id) logger.warning("Problem retrieving task %i" % main_id)
def main(): def main():
# make sure we run in UTC timezone # make sure we run in UTC timezone
import os import os
...@@ -97,7 +108,8 @@ def main(): ...@@ -97,7 +108,8 @@ def main():
parser.add_option("-e", "--exchange", dest="exchange", type="string", parser.add_option("-e", "--exchange", dest="exchange", type="string",
default=DEFAULT_BUSNAME, default=DEFAULT_BUSNAME,
help="Bus or queue to communicate on. [default: %default]") help="Bus or queue to communicate on. [default: %default]")
parser.add_option('-b', '--broker', dest='broker', type='string', default=None, help='Address of the broker, default: localhost') parser.add_option('-b', '--broker', dest='broker', type='string', default=None,
help='Address of the broker, default: localhost')
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
with OTDBBusListener(handler_type=RATaskSpecifiedOTDBEventMessageHandler, with OTDBBusListener(handler_type=RATaskSpecifiedOTDBEventMessageHandler,
......
...@@ -21,11 +21,10 @@ ...@@ -21,11 +21,10 @@
# #
import logging import logging
from datetime import datetime, timedelta
from time import sleep from time import sleep
from threading import Thread from threading import Thread
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.otdb.otdbrpc import OTDBRPC
...@@ -36,6 +35,7 @@ from lofar.common.datetimeutils import * ...@@ -36,6 +35,7 @@ from lofar.common.datetimeutils import *
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
try: try:
#only reschedule pipelines which run on cep4 #only reschedule pipelines which run on cep4
...@@ -84,6 +84,7 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): ...@@ -84,6 +84,7 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
except Exception as e: except Exception as e:
logger.error("Error while checking pipeline starttime: %s", e) logger.error("Error while checking pipeline starttime: %s", e)
class ScheduleChecker(): class ScheduleChecker():
def __init__(self, def __init__(self,
busname=DEFAULT_BUSNAME, busname=DEFAULT_BUSNAME,
...@@ -92,10 +93,10 @@ class ScheduleChecker(): ...@@ -92,10 +93,10 @@ class ScheduleChecker():
""" """
self._thread = None self._thread = None
self._running = False self._running = False
self._radbrpc = RARPC(busname=busname, broker=broker) self._radbrpc = RADBRPC.create(exchange=busname, broker=broker)
self._momrpc = MoMQueryRPC(busname=busname, broker=broker) self._momrpc = MoMQueryRPC.create(exchange=busname, broker=broker)
self._curpc = CleanupRPC(busname=busname, broker=broker) self._curpc = CleanupRPC.create(exchange=busname, broker=broker)
self._otdbrpc = OTDBRPC(busname=busname, broker=broker, timeout=180) self._otdbrpc = OTDBRPC.create(exchange=busname, broker=broker, timeout=180)
def __enter__(self): def __enter__(self):
"""Internal use only. (handles scope 'with')""" """Internal use only. (handles scope 'with')"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment