diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index 93b9c56386a69c2cc809b161dafb1a644edd9072..1ae1d64f4281586b4f8683e574bde651a1ec628a 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -30,10 +30,11 @@ to assign resources to these tasks. import qpid.messaging import logging from datetime import datetime +import time import pprint from lofar.sas.resourceassignment.rataskspecified.RABusListener import RATaskSpecifiedBusListener -from lofar.messaging.RPC import RPC +from lofar.messaging.RPC import RPC, RPCException import lofar.sas.resourceassignment.resourceassignmentservice.rpc as rarpc from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME @@ -62,9 +63,9 @@ class SpecifiedTaskListener(RATaskSpecifiedBusListener): def onTaskSpecified(self, sasId, modificationTime, resourceIndicators): logger.info('onTaskSpecified: sasId=%s resourceIndicators==%s' % (sasId, pprint.pformat(resourceIndicators))) - #cluster = parseSpecification(resourceIndicators) - #needed = getNeededResouces(resourceIndicators) - #available = getAvailableResources(cluster) + cluster = parseSpecification(resourceIndicators) + needed = getNeededResouces(resourceIndicators) + available = getAvailableResources(cluster) #if checkResources(needed, available): #result = claimResources(needed) #if result.success: @@ -75,43 +76,49 @@ class SpecifiedTaskListener(RATaskSpecifiedBusListener): #pass def parseSpecification(specification): - default = "CEP2" - cluster ="CEP4" + # TODO: cluster is not part of specification yet. For now return CEP4. Add logic later. + default = "cep2" + cluster ="cep4" return cluster def getNeededResouces(specification): - # Used settings - ServiceName = "ToUpper" - BusName = "simpletest" - - # Initialize a Remote Procedure Call object - with RPC(BusName,ServiceName) as remote_fn: - replymessage, status = remote_fn("Hello World ToUpper.") + with RPC('ResourceEstimation', busname='lofar.ra.command', broker='10.149.96.6') as rpc: + replymessage, status = rpc(specification) print replymessage def getAvailableResources(cluster): # Used settings - ServiceName = "GetServerState" - BusName = "simpletest" - - groupnames = [] - available = [] - with RPC(BusName, ServiceName) as GetServerState: - replymessage, status = GetServerState.getactivegroupnames() - if not status: - groupnames = replymessage - else: - logger.error("T") - if cluster in groupnames.keys(): - with RPC(BusName, ServiceName) as GetServerState: - replymessage, status = GetServerState.gethostsforgid(groupnames[cluster]) - if not status: - available = replymessage + groupnames = {} + available = {} + while True: + try: + with RPC('SSDBService.GetActiveGroupNames', busname='lofar.system', timeout=10, broker='10.149.96.6') as ssdbGetActiveGroupNames: + replymessage, status = ssdbGetActiveGroupNames() + if status == 'OK': + groupnames = replymessage + logger.info('SSDBService ActiveGroupNames: %s' % groupnames) + else: + logger.error("Could not get active group names from SSDBService: %s" % status) + + groupnames = {v:k for k,v in groupnames.items()} #swap key/value for name->id lookup + logger.info('groupnames: %s' % groupnames) + if cluster in groupnames.keys(): + groupId = groupnames[cluster] + with RPC('SSDBService.GetHostForGID', busname='lofar.system', timeout=10, broker='10.149.96.6') as ssdbGetHostForGID: + replymessage, status = ssdbGetHostForGID(groupId) + if status == 'OK': + available = replymessage + logger.info('available: %s' % available) + else: + logger.error("Could not get hosts for group %s (gid=%s) from SSDBService: %s" % (cluster, groupId, status)) else: - logger.error("T") - else: - logger.error("T") - return available + logger.error("group \'%s\' not known in SSDBService active groups (%s)" % (cluster, ', '.join(groupnames.keys()))) + return available + except KeyboardInterrupt: + break + except Exception as e: + logger.warning("Exception while getting available resources. Trying again... " + str(e)) + time.sleep(0.25) def checkResources(needed, available): return True