Skip to content
Snippets Groups Projects
Commit 48b7c952 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: process estimator results, create and insert resource_claims and properties

parent 8fd91aad
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,7 @@ ResourceAssigner inserts/updates tasks and assigns resources to it based on inco
import logging
from datetime import datetime
import time
import collections
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset
......@@ -140,9 +141,14 @@ class ResourceAssigner():
logger.error("no otdb_id %s found in estimator results %s" % (otdb_id, needed))
return
if not taskType in needed[str(otdb_id)]:
logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)]))
return
main_needed = needed[str(otdb_id)]
if self.checkResources(main_needed, available):
claimed, resourceIds = self.claimResources(main_needed, taskId, startTime, endTime)
task = self.radbrpc.getTask(taskId)
claimed, resourceIds = self.claimResources(main_needed, task)
if claimed:
self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated')
self.radbrpc.updateTask(taskId, status='scheduled')
......@@ -218,22 +224,66 @@ class ResourceAssigner():
def checkResources(self, needed, available):
return True
def claimResources(self, resources, taskId, startTime, endTime):
#TEMP HACK
cep4storage = resources['observation']['total_data_size']
resources = dict()
resources['cep4storage'] = cep4storage
resourceNameDict = {r['name']:r for r in self.radbrpc.getResources()}
claimedStatusId = next(x['id'] for x in self.radbrpc.getResourceClaimStatuses() if x['name'].lower() == 'claimed')
resourceClaimIds = []
for r in resources:
if r in resourceNameDict:
resourceClaimIds.append(self.radbrpc.insertResourceClaim(resourceNameDict[r]['id'], taskId, startTime, endTime, claimedStatusId, 1, -1, 'anonymous', -1))
success = len(resourceClaimIds) == len(resources)
return success, resourceClaimIds
def claimResources(self, needed_resources, task):
logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources))
# get the needed resources for the task type
needed_resources_for_task_type = needed_resources[task['type']]
# get db lists
rc_property_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()}
resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()}
resources = self.radbrpc.getResources()
# loop over needed_resources -> resource_type -> claim (and props)
# flatten the tree dict to a list of claims (with props)
claims = []
for resource_type_name, needed_claim_for_resource_type in needed_resources_for_task_type.items():
if resource_type_name in resource_types:
logger.info('claimResources: processing resource_type: %s' % resource_type_name)
db_resource_type_id = resource_types[resource_type_name]
db_resources_for_type = [r for r in resources if r['type_id'] == db_resource_type_id]
# needed_claim_for_resource_type is a dict containing exactly one kvp of which the value is an int
# that value is the value for the claim
needed_claim_value = next((v for k,v in needed_claim_for_resource_type.items() if isinstance(v, int)))
# FIXME: right now we just pick the first resource from the 'cep4' resources.
# estimator will deliver this info in the future
db_cep4_resources_for_type = [r for r in db_resources_for_type if 'cep4' in r['name'].lower()]
if db_cep4_resources_for_type:
claim = {'resource_id':db_cep4_resources_for_type[0]['id'],
'starttime':task['starttime'],
'endtime':task['endtime'],
'status':'claimed',
'claim_size':1}
# if the needed_claim_for_resource_type dict contains more kvp's,
# then the subdict contains groups of properties for the claim
if len(needed_claim_for_resource_type) > 1:
claim['properties'] = []
needed_prop_groups = next((v for k,v in needed_claim_for_resource_type.items() if isinstance(v, collections.Iterable)))
for group_name, needed_prop_group in needed_prop_groups.items():
if group_name == 'saps':
logger.info('skipping sap')
else:
for prop_type_name, prop_value in needed_prop_group.items():
if prop_type_name in rc_property_types:
rc_property_type_id = rc_property_types[prop_type_name]
property = {'type':rc_property_type_id, 'value':prop_value}
claim['properties'].append(property)
else:
logger.error('claimResources: unknown prop_type:%s' % prop_type_name)
logger.info('claimResources: created claim:%s' % claim)
claims.append(claim)
else:
logger.error('claimResources: unknown resource_type:%s' % resource_type_name)
logger.info('claimResources: inserting %d claims in the radb' % len(claims))
claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids']
logger.info('claimResources: %d claims were inserted in the radb' % len(claim_ids))
return len(claim_ids) == len(claims), claim_ids
......@@ -10,9 +10,6 @@ from lofar.messaging.RPC import RPC
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
logger.warning("TODO: fix this test")
exit(3)
try:
from mock import MagicMock
from mock import patch
......@@ -31,16 +28,25 @@ with patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC', a
mockRARPC = MockRARPC.return_value
# modify the return values of the various RARPC methods with pre-cooked answers
mockRARPC.getTask.return_value = {u'status': u'active', u'status_id': 600, u'type_id': 0, u'specification_id': 8, u'starttime': datetime.datetime(2016, 2, 14, 20, 0), u'mom_id': 634163, u'endtime': datetime.datetime(2016, 2, 14, 21, 30), u'type': u'Observation', u'id': 9355, u'otdb_id': 431140}
mockRARPC.getTask.return_value = {"endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), "id": 2299, "mom_id": 351543, "name": "IS HBA_DUAL", "otdb_id": 1290472, "predecessor_ids": [], "project_mom_id": 2, "project_name": "test-lofar", "specification_id": 2323, "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), "status": "prescheduled", "status_id": 350, "successor_ids": [], "type": "observation", "type_id": 0}
mockRARPC.getResourceTypes.return_value = [{"id": 0, "name": "rsp", "unit": "rsp_channel_bit", "unit_id": 0}, {"id": 1, "name": "tbb", "unit": "bytes", "unit_id": 1}, {"id": 2, "name": "rcu", "unit": "rcu_board", "unit_id": 2}, {"id": 3, "name": "bandwidth", "unit": "bytes/second", "unit_id": 3}, {"id": 4, "name": "processor", "unit": "cores", "unit_id": 4}, {"id": 5, "name": "storage", "unit": "bytes", "unit_id": 1}]
mockRARPC.getResources.return_value = [{"id": 116, "name": "cep4bandwidth", "type": "bandwidth", "type_id": 3, "unit": "bytes/second"},{"id": 117, "name": "cep4storage", "type": "storage", "type_id": 5, "unit": "bytes"}]
mockRARPC.getResourceClaimPropertyTypes.return_value = [{"id": 0, "name": "nr_of_is_files"}, {"id": 1, "name": "nr_of_cs_files"}, {"id": 2, "name": "nr_of_uv_files"}, {"id": 3, "name": "nr_of_im_files"}, {"id": 4, "name": "nr_of_cores"}, {"id": 5, "name": "nr_of_beamlets"}, {"id": 6, "name": "nr_of_bits"}, {"id": 7, "name": "is_file_size"}]
def mockRARPC_insertResourceClaims(*arg, **kwarg):
logger.info("insertResourceClaims: %s" % ', '.join(str(x) for x in arg))
return {'ids':range(len(arg[1]))}
mockRARPC.insertResourceClaims.side_effect = mockRARPC_insertResourceClaims
#mock the RPC execute method
def mockRPCExecute(*arg, **kwarg):
#trick to get the servicename via the callstack from within this mock method
servicename = inspect.stack()[3][0].f_locals['self'].ServiceName
logger.info("mockRPCExecute servicename=%s" % servicename)
#give pre-cooked answer depending on called service
if servicename == 'ResourceEstimator':
return {'Observation':{'total_data_size':1, 'total_bandwidth':1, 'output_files':1}}, "OK"
if servicename == 'ResourceEstimation':
return {'1290472': {'observation': {'bandwidth': {'total_size': 9372800}, 'storage': {'total_size': 140592000, 'output_files': {'is': {'is_nr_stokes': 1, 'is_file_size': 36864000, 'nr_of_is_files': 1}, 'uv': {'nr_of_uv_files': 50, 'uv_file_size': 2074560}, 'saps': [{'nr_of_uv_files': 50, 'sap_nr': 0}, {'sap_nr': 0, 'nr_of_is_files': 1}]}}}}}, "OK"
elif servicename == 'SSDBService.GetActiveGroupNames':
return {0:'storagenodes', 1:'computenodes', 2:'archivenodes', 3:'locusnodes', 4:'cep4'}, "OK"
elif servicename == 'SSDBService.GetHostForGID':
......@@ -57,14 +63,25 @@ with patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC', a
class ResourceAssignerTest(unittest.TestCase):
'''Test the logic in the ResourceAssigner'''
def testResourceAssigner(self):
#def test_doAssignment(self):
#with ResourceAssigner() as assigner:
##define inputs
#specification_tree={u'task_type': u'pipeline', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-03-25 14:15:37', u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', u'Observation.momID': u'351557', u'Observation.startTime': u'2016-03-25 14:14:57', u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_subtype': u'long baseline pipeline', u'state': u'prescheduled', u'otdb_id': 1290494, u'predecessors': [{u'task_subtype': u'averaging pipeline', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-03-25 13:51:05', u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', u'Version.number': u'33774', u'Observation.momID': u'351556', u'Observation.startTime': u'2016-03-25 13:49:55', u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_type': u'pipeline', u'otdb_id': 1290496, u'predecessors': [{u'task_subtype': u'bfmeasurement', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', u'Observation.stopTime': u'2016-03-26 00:33:31', u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508', u'RS106'], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I', u'Observation.Beam[0].subbandList': [100, 101, 102, 103], u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512', u'Observation.DataProducts.Input_Correlated.skip': [], u'Observation.antennaSet': u'HBA_DUAL', u'Observation.nrBitsPerSample': u'8', u'Observation.Beam[0].nrTabRings': u'0', u'Observation.Beam[0].nrTiedArrayBeams': u'0', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False, u'Observation.nrBeams': u'1', u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': False, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1', u'Observation.momID': u'351539', u'Observation.startTime': u'2016-03-26 00:31:31', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_type': u'observation', u'otdb_id': 1290476, u'predecessors': []}]}]}
##test the main assignment method
#assigner.doAssignment(specification_tree)
##TODO: added test asserts etc
def test_claimResources(self):
with ResourceAssigner() as assigner:
#define inputs
sasId='431140'
parsets={u'431140': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-02-14 21:30:00', u'Observation.VirtualInstrument.stationList': [u'CS005', u'CS001', u'CS011', u'CS401', u'CS002', u'CS007', u'CS201', u'CS032', u'CS003', u'CS101', u'CS028', u'CS017', u'CS024', u'CS103', u'CS026', u'CS501', u'CS031', u'CS301', u'CS030', u'CS302', u'CS004', u'CS006', u'CS021'], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': True, u'Task.type': u'Observation', u'Observation.Beam[0].subbandList': [51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450], u'Observation.DataProducts.Input_Correlated.skip': [], u'Observation.antennaSet': u'LBA_OUTER', u'Observation.nrBitsPerSample': u'8', u'Observation.Beam[0].nrTabRings': u'0', u'Version.number': u'33385', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': False, u'Observation.Beam[0].TiedArrayBeam[0].coherent': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Task.subtype': u'BFMeasurement', u'Observation.momID': u'634163', u'Observation.startTime': u'2016-02-14 20:00:00', u'Observation.nrBeams': u'1', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': False, u'Observation.sampleClock': u'200'}}
estimator_output,_=assigner.rerpc()
needed_resources=estimator_output['1290472']
task = assigner.radbrpc.getTask(1290472)
#test the main assignment method
assigner.doAssignment(sasId, parsets)
#test claimResources method
assigner.claimResources(needed_resources, task)
#TODO: added test asserts etc
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment