Skip to content
Snippets Groups Projects
Commit d3a809b9 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: process successors for inserted tasks

parent 261b8013
No related branches found
No related tags found
No related merge requests found
...@@ -48,6 +48,9 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SER ...@@ -48,6 +48,9 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SER
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ResourceAssigner(): class ResourceAssigner():
...@@ -60,6 +63,8 @@ class ResourceAssigner(): ...@@ -60,6 +63,8 @@ class ResourceAssigner():
otdb_servicename=DEFAULT_OTDB_SERVICENAME, otdb_servicename=DEFAULT_OTDB_SERVICENAME,
ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME,
ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX, ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None): broker=None):
""" """
ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset. ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset.
...@@ -72,6 +77,7 @@ class ResourceAssigner(): ...@@ -72,6 +77,7 @@ class ResourceAssigner():
self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker)
self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True) self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True)
self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker)
self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
self.ra_notification_prefix = ra_notification_prefix self.ra_notification_prefix = ra_notification_prefix
...@@ -89,6 +95,7 @@ class ResourceAssigner(): ...@@ -89,6 +95,7 @@ class ResourceAssigner():
self.radbrpc.open() self.radbrpc.open()
self.rerpc.open() self.rerpc.open()
self.otdbrpc.open() self.otdbrpc.open()
self.momrpc.open()
self.ra_notification_bus.open() self.ra_notification_bus.open()
def close(self): def close(self):
...@@ -96,6 +103,7 @@ class ResourceAssigner(): ...@@ -96,6 +103,7 @@ class ResourceAssigner():
self.radbrpc.close() self.radbrpc.close()
self.rerpc.close() self.rerpc.close()
self.otdbrpc.close() self.otdbrpc.close()
self.momrpc.close()
self.ra_notification_bus.close() self.ra_notification_bus.close()
def doAssignment(self, specification_tree): def doAssignment(self, specification_tree):
...@@ -193,6 +201,7 @@ class ResourceAssigner(): ...@@ -193,6 +201,7 @@ class ResourceAssigner():
self._sendNotification(task, 'scheduled') self._sendNotification(task, 'scheduled')
self.processPredecessors(specification_tree) self.processPredecessors(specification_tree)
self.processSuccessors(task)
else: else:
logger.warning('doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % (taskId)) logger.warning('doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % (taskId))
self.radbrpc.updateTask(taskId, status='conflict') self.radbrpc.updateTask(taskId, status='conflict')
...@@ -240,6 +249,28 @@ class ResourceAssigner(): ...@@ -240,6 +249,28 @@ class ResourceAssigner():
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
def processSuccessors(self, task):
try:
successor_mom_ids = self.momrpc.getSuccessorIds(task['mom_id'])[str(task['mom_id'])]
if successor_mom_ids:
logger.info('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id'])
for successor_mom_id in successor_mom_ids:
#check if the successor needs to be linked to this task
successor_task = self.radbrpc.getTask(mom_id=successor_mom_id)
if successor_task:
if successor_task['id'] not in task['successor_ids']:
logger.info('connecting successor task with otdb_id=%s to it\'s predecessor with otdb_id=%s', successor_task['otdb_id'], task['otdb_id'])
self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id'])
else:
logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id'])
else:
logger.info('no successors for otdb_id=%s', task['otdb_id'])
except Exception as e:
logger.error(e)
def getMaxPredecessorEndTime(self, specification_tree): def getMaxPredecessorEndTime(self, specification_tree):
try: try:
predecessor_specs = [tree['specification'] for tree in specification_tree['predecessors']] predecessor_specs = [tree['specification'] for tree in specification_tree['predecessors']]
...@@ -352,4 +383,3 @@ class ResourceAssigner(): ...@@ -352,4 +383,3 @@ class ResourceAssigner():
claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids'] claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids']
logger.info('claimResources: %d claims were inserted in the radb' % len(claim_ids)) logger.info('claimResources: %d claims were inserted in the radb' % len(claim_ids))
return len(claim_ids) == len(claims), claim_ids return len(claim_ids) == len(claims), claim_ids
...@@ -140,6 +140,8 @@ def main(): ...@@ -140,6 +140,8 @@ def main():
otdb_servicename=options.otdb_servicename, otdb_servicename=options.otdb_servicename,
ra_notification_busname=options.ra_notification_busname, ra_notification_busname=options.ra_notification_busname,
ra_notification_prefix=options.ra_notification_prefix, ra_notification_prefix=options.ra_notification_prefix,
mom_busname=options.mom_query_busname,
mom_servicename=options.mom_query_servicename,
broker=options.broker) as assigner: broker=options.broker) as assigner:
with SpecifiedTaskListener(busname=options.notification_busname, with SpecifiedTaskListener(busname=options.notification_busname,
subject=options.notification_subject, subject=options.notification_subject,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment