diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt index a0bdc997103cd89e0b439b214d6ae217e903292e..3d94fc4e5d3be20e76d4df2db12408812ebf7bce 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/CMakeLists.txt @@ -3,6 +3,7 @@ python_install( __init__.py raservice.py + rarpc.py resource_assigner.py resource_availability_checker.py rabuslistener.py diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/rarpc.py b/SAS/ResourceAssignment/ResourceAssigner/lib/rarpc.py new file mode 100755 index 0000000000000000000000000000000000000000..23d865a36d980de5c4815b8bd06e11d3ed545d61 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/rarpc.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +# ResourceAssigner.py: ResourceAssigner listens on the lofar ?? bus and calls onTaskSpecified +# +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: raservice.py 1580 2015-09-30 14:18:57Z loose $ + +""" +TaskSpecifiedListener listens to a bus on which specified tasks get published. It will then try +to assign resources to these tasks. +""" + +import logging +logger = logging.getLogger(__name__) + +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging.rpc import RPCClientContextManagerMixin, RPCClient + + +class RARPC(RPCClientContextManagerMixin): + def __init__(self, rpc_client: RPCClient = None): + """Create an instance of the RARPC using the given RPCClient, + or if None given, to a default RPCClient connecting to the "RAService" service""" + super().__init__() + self._rpc_client = rpc_client or RPCClient(service_name="RAService") + + @staticmethod + def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): + """Create an RARPC connecting to the given exchange/broker on the default "RAService" service""" + return RARPC(RPCClient(service_name="RAService", exchange=exchange, broker=broker)) + + def do_assignment(self, otdb_id, specification_tree): + return self._rpc_client.execute('do_assignment', otdb_id=otdb_id, specification_tree=specification_tree) + + diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index 66709f74dffe1411ff16e8be70de95ad1eee5745..63edabd6e199251e19b8d88d98f5c743e4af4f6c 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -31,6 +31,7 @@ import logging from lofar.common import dbcredentials from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging.rpc import RPCService, ServiceMessageHandler from lofar.sas.resourceassignment.rataskspecified.RABusListener import RATaskSpecifiedEventMessageHandler, RATaskSpecifiedBusListener from lofar.sas.resourceassignment.resourceassigner.resource_assigner import ResourceAssigner from lofar.sas.resourceassignment.resourceassigner.schedulechecker import ScheduleChecker @@ -57,7 +58,59 @@ class SpecifiedTaskEventMessageHandler(RATaskSpecifiedEventMessageHandler): except Exception as e: logger.error(str(e)) -__all__ = ["SpecifiedTaskEventMessageHandler"] +class ResourceAssignerServiceMessageHandler(ServiceMessageHandler): + def __init__(self, assigner=None): + super().__init__() + self.assigner = assigner + + def do_assignment(self, otdb_id, specification_tree): + return self.assigner.do_assignment(otdb_id, specification_tree) + +class RAService: + def __init__(self, radbcreds: dbcredentials.DBCredentials=None, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): + if radbcreds is None: + radbcreds = dbcredentials.DBCredentials().get("RADB") + logger.info("Read default RADB dbcreds from disk: %s" % radbcreds.stringWithHiddenPassword()) + + self.assigner = ResourceAssigner(exchange=exchange, broker=broker, radb_dbcreds=radbcreds) + + # create a buslistener for the Event-Driven paradigm, doing resourceassignment upon receiving a RATaskSpecified-EventMessage + self.rpcservice = RPCService(service_name="RAService", + handler_type=ResourceAssignerServiceMessageHandler, + handler_kwargs={"assigner": self.assigner}, + exchange=exchange, + broker=broker) + + # create a service for the blocking RPC call, doing resourceassignment upon receiving a RequestMessage + self.rataskspecifiedbuslistener = RATaskSpecifiedBusListener(handler_type=SpecifiedTaskEventMessageHandler, + handler_kwargs={"assigner": self.assigner}, + exchange=exchange, + broker=broker) + + def start_listening(self): + self.assigner.open() + self.rpcservice.start_listening() + self.rataskspecifiedbuslistener.start_listening() + + def stop_listening(self): + self.rpcservice.stop_listening() + self.rataskspecifiedbuslistener.stop_listening() + self.assigner.close() + + def __enter__(self): + try: + self.start_listening() + return self + except Exception as e: + # __exit__ (and hence stop_listening) is not called when an exception is raised in __enter__ + # so, do our own cleanup (log, stop_listening and re-raise). + logger.exception("%s error: %s", self, e) + self.stop_listening() + raise + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop_listening() + def main(): @@ -84,21 +137,14 @@ def main(): (options, args) = parser.parse_args() - radb_dbcreds = dbcredentials.parse_options(options) + radbcreds = dbcredentials.parse_options(options) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) - with ResourceAssigner(exchange=options.exchange, - broker=options.broker, - radb_dbcreds=radb_dbcreds) as assigner: - with RATaskSpecifiedBusListener(handler_type=SpecifiedTaskEventMessageHandler, - handler_kwargs={"assigner": assigner}, - exchange=options.exchange, - broker=options.broker): - with ScheduleChecker(exchange=options.exchange, - broker=options.broker): - waitForInterrupt() + with RAService(radbcreds=radbcreds, exchange=options.exchange, broker=options.broker): + with ScheduleChecker(exchange=options.exchange, broker=options.broker): + waitForInterrupt() if __name__ == '__main__': main()