diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py index 45eb4b35ef9a5f0c57394cee5b5dde1b1a29db57..2b498c030d45f91af8efdcba1859c8b32acd0710 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py @@ -32,7 +32,7 @@ from lofar.messaging.messagebus import ToBus from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.common.util import single_line_with_single_spaces -from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC +from lofar.sas.resourceassignment.database.radb import RADatabase from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX @@ -72,7 +72,7 @@ class ResourceAssigner(object): :param radb_dbcreds: the credentials to be used for accessing the RADB (default: None, which means default) """ - self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker) + self.radb = RADatabase(dbcreds=radb_dbcreds) self.rerpc = ResourceEstimatorRPC.create(exchange=exchange, broker=broker) self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker) self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker) @@ -81,7 +81,7 @@ class ResourceAssigner(object): self.ra_notification_bus = ToBus(exchange=exchange, broker=broker) self.obscontrol = ObservationControlRPCClient.create(exchange=exchange, broker=broker) - self.resource_availability_checker = ResourceAvailabilityChecker(self.radbrpc) + self.resource_availability_checker = ResourceAvailabilityChecker(self.radb) # For the DwellScheduler instances created during run-time we store the following variables self.radb_creds = radb_dbcreds @@ -97,8 +97,8 @@ class ResourceAssigner(object): self.close() def open(self): - """Open rpc connections to radb service and resource estimator service""" - self.radbrpc.open() + """Open connections to various service/buses/databases""" + self.radb.connect() self.rerpc.open() self.otdbrpc.open() self.momrpc.open() @@ -108,9 +108,9 @@ class ResourceAssigner(object): self.obscontrol.open() def close(self): - """Close rpc connections to radb service and resource estimator service""" + """Close connections to various service/buses/databases""" self.obscontrol.close() - self.radbrpc.close() + self.radb.disconnect() self.rerpc.close() self.otdbrpc.close() self.momrpc.close() @@ -123,7 +123,7 @@ class ResourceAssigner(object): def resource_types(self): """ Returns a dict of all the resource types, to convert name->id. """ - return {rt['name']: rt['id'] for rt in self.radbrpc.getResourceTypes()} + return {rt['name']: rt['id'] for rt in self.radb.getResourceTypes()} def do_assignment(self, otdb_id, specification_tree): """ @@ -140,9 +140,9 @@ class ResourceAssigner(object): :raises an Exception if something unforeseen happened while scheduling """ - logger.info(('do_assignment: otdb_id=%s specification_tree=%s' % (otdb_id, specification_tree))) + logger.info('do_assignment: otdb_id=%s specification_tree=%s', otdb_id, specification_tree) - spec = Specification(self.otdbrpc, self.momrpc, self.radbrpc) + spec = Specification(self.otdbrpc, self.momrpc, self.radb) spec.from_dict(specification_tree) spec.insert_into_radb() # TODO Move this to TaskSpecified? @@ -288,13 +288,14 @@ class ResourceAssigner(object): (scheduler_result, changed_tasks) = scheduler.allocate_resources() if not scheduler_result: # try again with basic scheduler to end up with a situation with the 'normal' conflicting resources, which can then be evaluated by users - with BasicScheduler(task_id=spec.radb_id, - specification_tree=specification_tree, - resource_estimator=self._get_resource_estimates, - resource_availability_checker=self.resource_availability_checker, - radbcreds=self.radb_creds) as basic_scheduler: + basic_scheduler = BasicScheduler(task_id=spec.radb_id, + specification_tree=specification_tree, + resource_estimator=self._get_resource_estimates, + resource_availability_checker=self.resource_availability_checker, + radbcreds=self.radb_creds) + with basic_scheduler: (scheduler_result, changed_tasks) = basic_scheduler.allocate_resources() - return scheduler_result + return scheduler_result elif changed_tasks: for t in changed_tasks: if t.status == 'aborted': #MAC_Scheduler can't handle queued right now See also schedulers.py around line 600