diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index 0295ca8ba6dcbbf595627a565e3f5cbd547e67fc..b1bd7fe8b0e3b9d50ce0f9d1d8dc41a3bebd2ead 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -32,16 +32,17 @@ import time from lofar.messaging.RPC import RPC, RPCException from lofar.parameterset import parameterset -from lofar.sas.resourceassignment.rotspservice.rpc import RARPC -from lofar.sas.resourceassignment.rotspservice.config import DEFAULT_BUSNAME as RADB_BUSNAME -from lofar.sas.resourceassignment.rotspservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME +from lofar.sas.resourceassignment.RAtoOTDBTaskSpecificationPropagator.rpc import RARPC +from lofar.sas.resourceassignment.RAtoOTDBTaskSpecificationPropagator.config import DEFAULT_BUSNAME as RADB_BUSNAME +from lofar.sas.resourceassignment.RAtoOTDBTaskSpecificationPropagator.config import DEFAULT_SERVICENAME as RADB_SERVICENAME from lofar.sas.otdb.otdbservice.config import DEFAULT_BUSNAME as OTDB_BUSNAME from lofar.sas.otdb.otdbservice.config import DEFAULT_SERVICENAME as OTDB_SERVICENAME +from lofar.sas.resourceassignment.RAtoOTDBTaskSpecificationPropagator.rpc import RAtoOTDBTranslator logger = logging.getLogger(__name__) -class RAtoOTDBTranslator(): +class RAtoOTDBPropagator(): def __init__(self, radb_busname=RADB_BUSNAME, radb_servicename=RADB_SERVICENAME, @@ -51,7 +52,7 @@ class RAtoOTDBTranslator(): otdb_broker=None, broker=None): """ - RAtoOTDBTranslator inserts/updates tasks in the radb and assigns resources to it based on incoming parset. + RAtoOTDBPropagator updates tasks in the OTDB after the ResourceAssigner is done with them. :param radb_busname: busname on which the radb service listens (default: lofar.ra.command) :param radb_servicename: servicename of the radb service (default: RADBService) :param radb_broker: valid Qpid broker host (default: None, which means localhost) @@ -66,6 +67,7 @@ class RAtoOTDBTranslator(): self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=radb_broker) self.otdbrpc = RPC(otdb_servicename, busname=otdb_busname, broker=otdb_broker, ForwardExceptions=True) + self.translator = RAtoOTDBTranslator() def __enter__(self): """Internal use only. (handles scope 'with')""" @@ -86,113 +88,27 @@ class RAtoOTDBTranslator(): self.radbrpc.close() self.otdbrpc.close() - def doTranslation(self, otdbId, momId, status='scheduled'): - logger.info('doTranslation: otdbId=%s momId=%s' % (sasId, momId)) - - #parse main parset... - mainParsetDict = parsets[str(sasId)] - mainParset = parameterset(mainParsetDict) - momId = mainParset.getInt('Observation.momID', -1) - taskType = mainParset.getString('Task.type', '') - if taskType.lower() == 'observation': - taskType = 'Observation' - startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S') - endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') - - ##check if task already present in radb - #existingTask = self.radbrpc.getTask(otdb_id=sasId) - - #if existingTask: - ##present, so update task and specification in radb - #taskId = existingTask['id'] - #specificationId = existingTask['specification_id'] - #self.radbrpc.updateSpecification(specificationId, startTime, endTime, str(mainParsetDict)) - #self.radbrpc.updateTask(taskId, momId, sasId, status, taskType, specificationId) - #else: - #insert new task and specification in the radb - specificationId = self.radbrpc.insertSpecification(startTime, endTime, str(mainParsetDict))['id'] - taskId = self.radbrpc.insertTask(momId, sasId, status, taskType, specificationId)['id'] - - #analyze the parset for needed and available resources and claim these in the radb - cluster = self.parseSpecification(mainParset) - available = self.getAvailableResources(cluster) - - needed = self.getNeededResouces(mainParset) - - if self.checkResources(needed, available): - claimed, resourceIds = self.claimResources(needed, taskId, startTime, endTime) - if claimed: - self.commitResourceClaimsForTask(taskId) - self.radbrpc.updateTask(taskId, status='scheduled') - else: - self.radbrpc.updateTask(taskId, status='conflict') - - def parseSpecification(self, parset): - # TODO: cluster is not part of specification yet. For now return CEP4. Add logic later. - default = "cep2" - cluster ="cep4" - return cluster - - def getNeededResouces(self, parset): - replymessage, status = self.rerpc(parset.dict(), timeout=10) - logger.info('getNeededResouces: %s' % replymessage) - stations = parset.getStringVector('Observation.VirtualInstrument.stationList', '') - logger.info('Stations: %s' % stations) - return replymessage - - def getAvailableResources(self, cluster): - # Used settings - groupnames = {} - available = {} - while True: - try: - replymessage, status = self.ssdbGetActiveGroupNames() - if status == 'OK': - groupnames = replymessage - logger.info('SSDBService ActiveGroupNames: %s' % groupnames) - else: - logger.error("Could not get active group names from SSDBService: %s" % status) - - groupnames = {v:k for k,v in groupnames.items()} #swap key/value for name->id lookup - logger.info('groupnames: %s' % groupnames) - if cluster in groupnames.keys(): - groupId = groupnames[cluster] - replymessage, status = self.ssdbGetHostForGID(groupId) - if status == 'OK': - available = replymessage - logger.info('available: %s' % available) - else: - logger.error("Could not get hosts for group %s (gid=%s) from SSDBService: %s" % (cluster, groupId, status)) - else: - logger.error("group \'%s\' not known in SSDBService active groups (%s)" % (cluster, ', '.join(groupnames.keys()))) - return available - except KeyboardInterrupt: - break - except Exception as e: - logger.warning("Exception while getting available resources. Trying again... " + str(e)) - time.sleep(0.25) - - def checkResources(self, needed, available): - return True - - def claimResources(self, resources, taskId, startTime, endTime): - #TEMP HACK - cep4storage = resources['Observation']['total_data_size'] - resources = dict() - resources['cep4storage'] = cep4storage - - resourceNameDict = {r['name']:r for r in self.radbrpc.getResources()} - claimedStatusId = next(x['id'] for x in self.radbrpc.getResourceClaimStatuses() if x['name'].lower() == 'claimed') - - resourceClaimIds = [] - - for r in resources: - if r in resourceNameDict: - resourceClaimIds.append(self.radbrpc.insertResourceClaim(resourceNameDict[r]['id'], taskId, startTime, endTime, claimedStatusId, 1, -1, 'anonymous', -1)) - - success = len(resourceClaimIds) == len(resources) - return success, resourceClaimIds - - def commitResourceClaimsForTask(self, taskId): - self.radbrpc.updateResourceClaimsForTask(taskId, status='ALLOCATED') - + def doPropagation(self, otdbId, momId, status): #status has no default. + logger.info('doPropagation: otdbId=%s momId=%s' % (sasId, momId)) + + if not otdbId: + logger.warning('doPropagation no valid otdbId: otdbId=%s' % (sasId, momId)) + return + + if status == 'conflict': + self.otdbrpc.UpdateTreeStatus(otdbId, 'conflict') + elif status == 'scheduled': + RAinfo = self.getRAinfo() + OTDBinfo = self.translator.doTranslation(RAinfo) + self.setOTDBinfo(otdbId, OTDBinfo, 'scheduled') + else: + logger.warning('doPropagation received unknown status: %s' % (status,)) + + def getRAinfo(): + info = {} + self.radbrpc. + + def setOTDBinfo(otdbId, OTDBinfo, OTDBstatus) + r = self.otdbrpc.UpdateTreeKey(otdbId, OTDBinfo('OTDBkeys')) + if r: + r = self.otdbrpc.UpdateTreeStatus(otdbId, OTDBstatus) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py index 1eade68326857a346d8ab1390508fc483b507c6d..b69af0fb2635abf3dc9669f16db26f18a0dba1d8 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py @@ -47,7 +47,7 @@ class RATaskScheduledListener(RATaskScheduledBusListener): busname=RATASKSCHEDULED_NOTIFICATION_BUSNAME, subject=RATASKSCHEDULED_NOTIFICATIONNAME, broker=None, - assigner=None, + propagator=None, ## TODO also give translator? **kwargs): """ RATaskScheduledListener listens on the lofar ?? bus and calls onTaskScheduled @@ -61,22 +61,22 @@ class RATaskScheduledListener(RATaskScheduledBusListener): """ super(RATaskScheduledListener, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs) - self.translator = translator - if not self.translator: - self.translator = RAtoOTDBTranslator() + self.propagator = propagator + if not self.propagator: + self.propagator = RAtoOTDBPropagator() def onTaskScheduled(self, otdbId, momId, modificationTime): logger.info('onTaskScheduled: otdbId=%s' % otdbId) - self.translator.doTranslation(otdbId, momId, 'scheduled') + self.propagator.doPropagation(otdbId, momId, 'scheduled') # def onTaskConflict(self, otdbId, momId, modificationTime): # logger.info('onTaskConflict: otdbId=%s' % otdbId) # -# self.translator.doTranslation(otdbId, momId, 'conflict') +# self.propagator.doPropagation(otdbId, momId, 'conflict') -__all__ = ["TaskScheduledListener"] +__all__ = ["RATaskScheduledListener"] def main(): from optparse import OptionParser @@ -90,7 +90,7 @@ def main(): # Check the invocation arguments parser = OptionParser("%prog [options]", - description='runs the resourceassigner service') + description='runs the RAtoOTDBTaskSpecificationPropagator service') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME) @@ -107,15 +107,15 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) - with RAtoOTDBTranslator(radb_busname=options.radb_busname, + with RAtoOTDBPropagator(radb_busname=options.radb_busname, radb_servicename=options.radb_servicename, otdb_busname=options.otdb_busname, otdb_servicename=options.otdb_servicename, - broker=options.broker) as translator: + broker=options.broker) as propagator: with TaskScheduledListener(busname=options.notification_busname, subject=options.notification_subject, broker=options.broker, - translator=translator) as listener: + propagator=propagator) as listener: waitForInterrupt() if __name__ == '__main__':