diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index b0c49e4945929a52843b2432403ef679996d8bde..651c0b37a156ae5adba4ea39e2cdd75578ceb27d 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -48,6 +48,9 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SER from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME + logger = logging.getLogger(__name__) class ResourceAssigner(): @@ -60,6 +63,8 @@ class ResourceAssigner(): otdb_servicename=DEFAULT_OTDB_SERVICENAME, ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, + mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, broker=None): """ ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset. @@ -72,6 +77,7 @@ class ResourceAssigner(): self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True) self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now + self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker) self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) self.ra_notification_prefix = ra_notification_prefix @@ -89,6 +95,7 @@ class ResourceAssigner(): self.radbrpc.open() self.rerpc.open() self.otdbrpc.open() + self.momrpc.open() self.ra_notification_bus.open() def close(self): @@ -96,6 +103,7 @@ class ResourceAssigner(): self.radbrpc.close() self.rerpc.close() self.otdbrpc.close() + self.momrpc.close() self.ra_notification_bus.close() def doAssignment(self, specification_tree): @@ -193,6 +201,7 @@ class ResourceAssigner(): self._sendNotification(task, 'scheduled') self.processPredecessors(specification_tree) + self.processSuccessors(task) else: logger.warning('doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % (taskId)) self.radbrpc.updateTask(taskId, status='conflict') @@ -240,6 +249,28 @@ class ResourceAssigner(): except Exception as e: logger.error(e) + def processSuccessors(self, task): + try: + successor_mom_ids = self.momrpc.getSuccessorIds(task['mom_id'])[str(task['mom_id'])] + + if successor_mom_ids: + logger.info('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id']) + + for successor_mom_id in successor_mom_ids: + #check if the successor needs to be linked to this task + successor_task = self.radbrpc.getTask(mom_id=successor_mom_id) + if successor_task: + if successor_task['id'] not in task['successor_ids']: + logger.info('connecting successor task with otdb_id=%s to it\'s predecessor with otdb_id=%s', successor_task['otdb_id'], task['otdb_id']) + self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) + else: + logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id']) + else: + logger.info('no successors for otdb_id=%s', task['otdb_id']) + + except Exception as e: + logger.error(e) + def getMaxPredecessorEndTime(self, specification_tree): try: predecessor_specs = [tree['specification'] for tree in specification_tree['predecessors']] @@ -352,4 +383,3 @@ class ResourceAssigner(): claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids'] logger.info('claimResources: %d claims were inserted in the radb' % len(claim_ids)) return len(claim_ids) == len(claims), claim_ids - diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index d00f7945c7dc995ff06ccbe1519789322a65f533..775d484b03e631d9eaab18b680353d8f88056aa5 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -140,6 +140,8 @@ def main(): otdb_servicename=options.otdb_servicename, ra_notification_busname=options.ra_notification_busname, ra_notification_prefix=options.ra_notification_prefix, + mom_busname=options.mom_query_busname, + mom_servicename=options.mom_query_servicename, broker=options.broker) as assigner: with SpecifiedTaskListener(busname=options.notification_busname, subject=options.notification_subject,