Skip to content
Snippets Groups Projects
Commit 972e90d8 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: available_capacity for cep4storage resource is updated regurarly...

Task #9607: available_capacity for cep4storage resource is updated regurarly by datamanagement service, so no need anymore to call ssdb
parent fad5abbd
No related branches found
No related tags found
No related merge requests found
...@@ -42,10 +42,6 @@ with patch('lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.rpc ...@@ -42,10 +42,6 @@ with patch('lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.rpc
#give pre-cooked answer depending on called service #give pre-cooked answer depending on called service
if servicename == 'ResourceEstimator': if servicename == 'ResourceEstimator':
return {'Observation':{'total_data_size':1, 'total_bandwidth':1, 'output_files':1}}, "OK" return {'Observation':{'total_data_size':1, 'total_bandwidth':1, 'output_files':1}}, "OK"
elif servicename == 'SSDBService.GetActiveGroupNames':
return {0:'storagenodes', 1:'computenodes', 2:'archivenodes', 3:'locusnodes', 4:'cep4'}, "OK"
elif servicename == 'SSDBService.GetHostForGID':
return {u'groupname': u'cep4', u'nodes': [{u'claimedspace': 0, u'totalspace': 702716, u'statename': u'Active', u'usedspace': 23084, u'id': 1, u'groupname': u'cep4', u'path': u'/lustre', u'hostname': u'lustre001'}]}, "OK"
return None, None return None, None
......
...@@ -45,10 +45,6 @@ from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFA ...@@ -45,10 +45,6 @@ from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFA
from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.systemstatus.service.SSDBrpc import SSDBRPC
from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_BUSNAME
from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
...@@ -60,8 +56,6 @@ class ResourceAssigner(): ...@@ -60,8 +56,6 @@ class ResourceAssigner():
radb_servicename=RADB_SERVICENAME, radb_servicename=RADB_SERVICENAME,
re_busname=RE_BUSNAME, re_busname=RE_BUSNAME,
re_servicename=RE_SERVICENAME, re_servicename=RE_SERVICENAME,
ssdb_busname=DEFAULT_SSDB_BUSNAME,
ssdb_servicename=DEFAULT_SSDB_SERVICENAME,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME, otdb_servicename=DEFAULT_OTDB_SERVICENAME,
ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME,
...@@ -73,13 +67,10 @@ class ResourceAssigner(): ...@@ -73,13 +67,10 @@ class ResourceAssigner():
:param radb_servicename: servicename of the radb service (default: RADBService) :param radb_servicename: servicename of the radb service (default: RADBService)
:param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command) :param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command)
:param re_servicename: servicename of the resource estimator service (default: ResourceEstimation) :param re_servicename: servicename of the resource estimator service (default: ResourceEstimation)
:param ssdb_busname: busname on which the ssdb service listens (default: lofar.system)
:param ssdb_servicename: servicename of the radb service (default: SSDBService)
:param broker: Valid Qpid broker host (default: None, which means localhost) :param broker: Valid Qpid broker host (default: None, which means localhost)
""" """
self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker)
self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True) self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True)
self.ssdbrpc = SSDBRPC(servicename=ssdb_servicename, busname=ssdb_busname, broker=broker)
self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
self.ra_notification_prefix = ra_notification_prefix self.ra_notification_prefix = ra_notification_prefix
...@@ -98,7 +89,6 @@ class ResourceAssigner(): ...@@ -98,7 +89,6 @@ class ResourceAssigner():
self.radbrpc.open() self.radbrpc.open()
self.rerpc.open() self.rerpc.open()
self.otdbrpc.open() self.otdbrpc.open()
self.ssdbrpc.open()
self.ra_notification_bus.open() self.ra_notification_bus.open()
def close(self): def close(self):
...@@ -106,7 +96,6 @@ class ResourceAssigner(): ...@@ -106,7 +96,6 @@ class ResourceAssigner():
self.radbrpc.close() self.radbrpc.close()
self.rerpc.close() self.rerpc.close()
self.otdbrpc.close() self.otdbrpc.close()
self.ssdbrpc.close()
self.ra_notification_bus.close() self.ra_notification_bus.close()
def doAssignment(self, specification_tree): def doAssignment(self, specification_tree):
...@@ -174,13 +163,6 @@ class ResourceAssigner(): ...@@ -174,13 +163,6 @@ class ResourceAssigner():
logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)])) logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)]))
return return
# make sure the availability in the radb is up to date
# TODO: this should be updated regularly
try:
self.updateAvailableResources('cep4')
except Exception as e:
logger.warning("Exception while updating available resources: %s" % str(e))
# claim the resources for this task # claim the resources for this task
# during the claim inserts the claims are automatically validated # during the claim inserts the claims are automatically validated
# and if not enough resources are available, then they are put to conflict status # and if not enough resources are available, then they are put to conflict status
...@@ -271,34 +253,6 @@ class ResourceAssigner(): ...@@ -271,34 +253,6 @@ class ResourceAssigner():
logger.info('getNeededResouces: %s' % replymessage) logger.info('getNeededResouces: %s' % replymessage)
return replymessage return replymessage
def updateAvailableResources(self, cluster):
# find out which resources are available
# and what is their capacity
# For now, only look at CEP4 storage
# Later, also look at stations up/down for short term scheduling
#get all active groupnames, find id for cluster group
groupnames = self.ssdbrpc.getactivegroupnames()
cluster_group_id = next(k for k,v in groupnames.items() if v == cluster)
# for CEP4 cluster, do hard codes lookup of first and only node
node_info = self.ssdbrpc.gethostsforgid(cluster_group_id)['nodes'][0]
storage_resources = self.radbrpc.getResources(resource_types='storage', include_availability=True)
cep4_storage_resource = next(x for x in storage_resources if 'cep4' in x['name'])
active = node_info['statename'] == 'Active'
total_capacity = node_info['totalspace']
available_capacity = total_capacity - node_info['usedspace']
logger.info("Updating resource availability of %s (id=%s) to active=%s available_capacity=%s total_capacity=%s" %
(cep4_storage_resource['name'], cep4_storage_resource['id'], active, available_capacity, total_capacity))
self.radbrpc.updateResourceAvailability(cep4_storage_resource['id'],
active=active,
available_capacity=available_capacity,
total_capacity=total_capacity)
def claimResources(self, needed_resources, task): def claimResources(self, needed_resources, task):
logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources)) logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources))
......
...@@ -83,8 +83,6 @@ def main(): ...@@ -83,8 +83,6 @@ def main():
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_BUSNAME
from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
...@@ -114,12 +112,6 @@ def main(): ...@@ -114,12 +112,6 @@ def main():
help="Name of the resource estimator service. [default: %default]") help="Name of the resource estimator service. [default: %default]")
parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default") parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default")
parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default") parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default")
parser.add_option("--ssdb_busname", dest="ssdb_busname", type="string",
default=DEFAULT_SSDB_BUSNAME,
help="Name of the bus on which the ssdb service listens. [default: %default]")
parser.add_option("--ssdb_servicename", dest="ssdb_servicename", type="string",
default=DEFAULT_SSDB_SERVICENAME,
help="Name of the ssdb service. [default: %default]")
parser.add_option("--ra_notification_busname", dest="ra_notification_busname", type="string", parser.add_option("--ra_notification_busname", dest="ra_notification_busname", type="string",
default=DEFAULT_RA_NOTIFICATION_BUSNAME, default=DEFAULT_RA_NOTIFICATION_BUSNAME,
help="Name of the notification bus on which the resourceassigner publishes its notifications. [default: %default]") help="Name of the notification bus on which the resourceassigner publishes its notifications. [default: %default]")
...@@ -139,8 +131,6 @@ def main(): ...@@ -139,8 +131,6 @@ def main():
re_servicename=options.re_servicename, re_servicename=options.re_servicename,
otdb_busname=options.otdb_busname, otdb_busname=options.otdb_busname,
otdb_servicename=options.otdb_servicename, otdb_servicename=options.otdb_servicename,
ssdb_busname=options.ssdb_busname,
ssdb_servicename=options.ssdb_servicename,
ra_notification_busname=options.ra_notification_busname, ra_notification_busname=options.ra_notification_busname,
ra_notification_prefix=options.ra_notification_prefix, ra_notification_prefix=options.ra_notification_prefix,
broker=options.broker) as assigner: broker=options.broker) as assigner:
......
...@@ -47,10 +47,6 @@ with patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC', a ...@@ -47,10 +47,6 @@ with patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC', a
#give pre-cooked answer depending on called service #give pre-cooked answer depending on called service
if servicename == 'ResourceEstimation': if servicename == 'ResourceEstimation':
return {'1290472': {'observation': {'bandwidth': {'total_size': 9372800}, 'storage': {'total_size': 140592000, 'output_files': {'is': {'is_nr_stokes': 1, 'is_file_size': 36864000, 'nr_of_is_files': 1}, 'uv': {'nr_of_uv_files': 50, 'uv_file_size': 2074560}, 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 50, 'nr_of_is_files': 1}}]}}}}}, "OK" return {'1290472': {'observation': {'bandwidth': {'total_size': 9372800}, 'storage': {'total_size': 140592000, 'output_files': {'is': {'is_nr_stokes': 1, 'is_file_size': 36864000, 'nr_of_is_files': 1}, 'uv': {'nr_of_uv_files': 50, 'uv_file_size': 2074560}, 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 50, 'nr_of_is_files': 1}}]}}}}}, "OK"
elif servicename == 'SSDBService.GetActiveGroupNames':
return {0:'storagenodes', 1:'computenodes', 2:'archivenodes', 3:'locusnodes', 4:'cep4'}, "OK"
elif servicename == 'SSDBService.GetHostForGID':
return {u'groupname': u'cep4', u'nodes': [{u'claimedspace': 0, u'totalspace': 702716, u'statename': u'Active', u'usedspace': 23084, u'id': 1, u'groupname': u'cep4', u'path': u'/lustre', u'hostname': u'lustre001'}]}, "OK"
return None, None return None, None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment