Skip to content
Snippets Groups Projects
Commit 07e5e52a authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-816: use direct radb connection. No need for (slower) messaging here

parent 5e125ce8
No related branches found
No related tags found
2 merge requests!59Merge LOFAR-Release-4_0 into master,!57Resolve SW-816
......@@ -32,7 +32,7 @@ from lofar.messaging.messagebus import ToBus
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.common.util import single_line_with_single_spaces
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.resourceassignment.database.radb import RADatabase
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
......@@ -72,7 +72,7 @@ class ResourceAssigner(object):
:param radb_dbcreds: the credentials to be used for accessing the RADB (default: None, which means default)
"""
self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker)
self.radb = RADatabase(dbcreds=radb_dbcreds)
self.rerpc = ResourceEstimatorRPC.create(exchange=exchange, broker=broker)
self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker)
self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker)
......@@ -81,7 +81,7 @@ class ResourceAssigner(object):
self.ra_notification_bus = ToBus(exchange=exchange, broker=broker)
self.obscontrol = ObservationControlRPCClient.create(exchange=exchange, broker=broker)
self.resource_availability_checker = ResourceAvailabilityChecker(self.radbrpc)
self.resource_availability_checker = ResourceAvailabilityChecker(self.radb)
# For the DwellScheduler instances created during run-time we store the following variables
self.radb_creds = radb_dbcreds
......@@ -97,8 +97,8 @@ class ResourceAssigner(object):
self.close()
def open(self):
"""Open rpc connections to radb service and resource estimator service"""
self.radbrpc.open()
"""Open connections to various service/buses/databases"""
self.radb.connect()
self.rerpc.open()
self.otdbrpc.open()
self.momrpc.open()
......@@ -108,9 +108,9 @@ class ResourceAssigner(object):
self.obscontrol.open()
def close(self):
"""Close rpc connections to radb service and resource estimator service"""
"""Close connections to various service/buses/databases"""
self.obscontrol.close()
self.radbrpc.close()
self.radb.disconnect()
self.rerpc.close()
self.otdbrpc.close()
self.momrpc.close()
......@@ -123,7 +123,7 @@ class ResourceAssigner(object):
def resource_types(self):
""" Returns a dict of all the resource types, to convert name->id. """
return {rt['name']: rt['id'] for rt in self.radbrpc.getResourceTypes()}
return {rt['name']: rt['id'] for rt in self.radb.getResourceTypes()}
def do_assignment(self, otdb_id, specification_tree):
"""
......@@ -140,9 +140,9 @@ class ResourceAssigner(object):
:raises an Exception if something unforeseen happened while scheduling
"""
logger.info(('do_assignment: otdb_id=%s specification_tree=%s' % (otdb_id, specification_tree)))
logger.info('do_assignment: otdb_id=%s specification_tree=%s', otdb_id, specification_tree)
spec = Specification(self.otdbrpc, self.momrpc, self.radbrpc)
spec = Specification(self.otdbrpc, self.momrpc, self.radb)
spec.from_dict(specification_tree)
spec.insert_into_radb() # TODO Move this to TaskSpecified?
......@@ -288,13 +288,14 @@ class ResourceAssigner(object):
(scheduler_result, changed_tasks) = scheduler.allocate_resources()
if not scheduler_result:
# try again with basic scheduler to end up with a situation with the 'normal' conflicting resources, which can then be evaluated by users
with BasicScheduler(task_id=spec.radb_id,
specification_tree=specification_tree,
resource_estimator=self._get_resource_estimates,
resource_availability_checker=self.resource_availability_checker,
radbcreds=self.radb_creds) as basic_scheduler:
basic_scheduler = BasicScheduler(task_id=spec.radb_id,
specification_tree=specification_tree,
resource_estimator=self._get_resource_estimates,
resource_availability_checker=self.resource_availability_checker,
radbcreds=self.radb_creds)
with basic_scheduler:
(scheduler_result, changed_tasks) = basic_scheduler.allocate_resources()
return scheduler_result
return scheduler_result
elif changed_tasks:
for t in changed_tasks:
if t.status == 'aborted': #MAC_Scheduler can't handle queued right now See also schedulers.py around line 600
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment