diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index 4a936b0f96845a70dd5a358015d1a133958c8639..54c73e12de8a67646a35ab574afa90458cbe3d57 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -27,6 +27,7 @@ ResourceAssigner inserts/updates tasks and assigns resources to it based on inco import logging from datetime import datetime import time +import collections from lofar.messaging.RPC import RPC, RPCException from lofar.parameterset import parameterset @@ -140,9 +141,14 @@ class ResourceAssigner(): logger.error("no otdb_id %s found in estimator results %s" % (otdb_id, needed)) return + if not taskType in needed[str(otdb_id)]: + logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)])) + return + main_needed = needed[str(otdb_id)] if self.checkResources(main_needed, available): - claimed, resourceIds = self.claimResources(main_needed, taskId, startTime, endTime) + task = self.radbrpc.getTask(taskId) + claimed, resourceIds = self.claimResources(main_needed, task) if claimed: self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated') self.radbrpc.updateTask(taskId, status='scheduled') @@ -218,22 +224,66 @@ class ResourceAssigner(): def checkResources(self, needed, available): return True - def claimResources(self, resources, taskId, startTime, endTime): - #TEMP HACK - cep4storage = resources['observation']['total_data_size'] - resources = dict() - resources['cep4storage'] = cep4storage - - resourceNameDict = {r['name']:r for r in self.radbrpc.getResources()} - claimedStatusId = next(x['id'] for x in self.radbrpc.getResourceClaimStatuses() if x['name'].lower() == 'claimed') - - resourceClaimIds = [] - - for r in resources: - if r in resourceNameDict: - resourceClaimIds.append(self.radbrpc.insertResourceClaim(resourceNameDict[r]['id'], taskId, startTime, endTime, claimedStatusId, 1, -1, 'anonymous', -1)) - - success = len(resourceClaimIds) == len(resources) - return success, resourceClaimIds + def claimResources(self, needed_resources, task): + logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources)) + + # get the needed resources for the task type + needed_resources_for_task_type = needed_resources[task['type']] + + # get db lists + rc_property_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()} + resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()} + resources = self.radbrpc.getResources() + + # loop over needed_resources -> resource_type -> claim (and props) + # flatten the tree dict to a list of claims (with props) + claims = [] + for resource_type_name, needed_claim_for_resource_type in needed_resources_for_task_type.items(): + if resource_type_name in resource_types: + logger.info('claimResources: processing resource_type: %s' % resource_type_name) + db_resource_type_id = resource_types[resource_type_name] + db_resources_for_type = [r for r in resources if r['type_id'] == db_resource_type_id] + + # needed_claim_for_resource_type is a dict containing exactly one kvp of which the value is an int + # that value is the value for the claim + needed_claim_value = next((v for k,v in needed_claim_for_resource_type.items() if isinstance(v, int))) + + # FIXME: right now we just pick the first resource from the 'cep4' resources. + # estimator will deliver this info in the future + db_cep4_resources_for_type = [r for r in db_resources_for_type if 'cep4' in r['name'].lower()] + + if db_cep4_resources_for_type: + claim = {'resource_id':db_cep4_resources_for_type[0]['id'], + 'starttime':task['starttime'], + 'endtime':task['endtime'], + 'status':'claimed', + 'claim_size':1} + + # if the needed_claim_for_resource_type dict contains more kvp's, + # then the subdict contains groups of properties for the claim + if len(needed_claim_for_resource_type) > 1: + claim['properties'] = [] + needed_prop_groups = next((v for k,v in needed_claim_for_resource_type.items() if isinstance(v, collections.Iterable))) + + for group_name, needed_prop_group in needed_prop_groups.items(): + if group_name == 'saps': + logger.info('skipping sap') + else: + for prop_type_name, prop_value in needed_prop_group.items(): + if prop_type_name in rc_property_types: + rc_property_type_id = rc_property_types[prop_type_name] + property = {'type':rc_property_type_id, 'value':prop_value} + claim['properties'].append(property) + else: + logger.error('claimResources: unknown prop_type:%s' % prop_type_name) + + logger.info('claimResources: created claim:%s' % claim) + claims.append(claim) + else: + logger.error('claimResources: unknown resource_type:%s' % resource_type_name) + logger.info('claimResources: inserting %d claims in the radb' % len(claims)) + claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids'] + logger.info('claimResources: %d claims were inserted in the radb' % len(claim_ids)) + return len(claim_ids) == len(claims), claim_ids diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 474a7ec4703471945ba091b01b4ff9727114d65c..61df076ac1ef122f5e681d69006a3811ed7dd19a 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -10,9 +10,6 @@ from lofar.messaging.RPC import RPC logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) -logger.warning("TODO: fix this test") -exit(3) - try: from mock import MagicMock from mock import patch @@ -31,16 +28,25 @@ with patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC', a mockRARPC = MockRARPC.return_value # modify the return values of the various RARPC methods with pre-cooked answers - mockRARPC.getTask.return_value = {u'status': u'active', u'status_id': 600, u'type_id': 0, u'specification_id': 8, u'starttime': datetime.datetime(2016, 2, 14, 20, 0), u'mom_id': 634163, u'endtime': datetime.datetime(2016, 2, 14, 21, 30), u'type': u'Observation', u'id': 9355, u'otdb_id': 431140} + mockRARPC.getTask.return_value = {"endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), "id": 2299, "mom_id": 351543, "name": "IS HBA_DUAL", "otdb_id": 1290472, "predecessor_ids": [], "project_mom_id": 2, "project_name": "test-lofar", "specification_id": 2323, "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), "status": "prescheduled", "status_id": 350, "successor_ids": [], "type": "observation", "type_id": 0} + mockRARPC.getResourceTypes.return_value = [{"id": 0, "name": "rsp", "unit": "rsp_channel_bit", "unit_id": 0}, {"id": 1, "name": "tbb", "unit": "bytes", "unit_id": 1}, {"id": 2, "name": "rcu", "unit": "rcu_board", "unit_id": 2}, {"id": 3, "name": "bandwidth", "unit": "bytes/second", "unit_id": 3}, {"id": 4, "name": "processor", "unit": "cores", "unit_id": 4}, {"id": 5, "name": "storage", "unit": "bytes", "unit_id": 1}] + mockRARPC.getResources.return_value = [{"id": 116, "name": "cep4bandwidth", "type": "bandwidth", "type_id": 3, "unit": "bytes/second"},{"id": 117, "name": "cep4storage", "type": "storage", "type_id": 5, "unit": "bytes"}] + mockRARPC.getResourceClaimPropertyTypes.return_value = [{"id": 0, "name": "nr_of_is_files"}, {"id": 1, "name": "nr_of_cs_files"}, {"id": 2, "name": "nr_of_uv_files"}, {"id": 3, "name": "nr_of_im_files"}, {"id": 4, "name": "nr_of_cores"}, {"id": 5, "name": "nr_of_beamlets"}, {"id": 6, "name": "nr_of_bits"}, {"id": 7, "name": "is_file_size"}] + + def mockRARPC_insertResourceClaims(*arg, **kwarg): + logger.info("insertResourceClaims: %s" % ', '.join(str(x) for x in arg)) + return {'ids':range(len(arg[1]))} + mockRARPC.insertResourceClaims.side_effect = mockRARPC_insertResourceClaims #mock the RPC execute method def mockRPCExecute(*arg, **kwarg): #trick to get the servicename via the callstack from within this mock method servicename = inspect.stack()[3][0].f_locals['self'].ServiceName + logger.info("mockRPCExecute servicename=%s" % servicename) #give pre-cooked answer depending on called service - if servicename == 'ResourceEstimator': - return {'Observation':{'total_data_size':1, 'total_bandwidth':1, 'output_files':1}}, "OK" + if servicename == 'ResourceEstimation': + return {'1290472': {'observation': {'bandwidth': {'total_size': 9372800}, 'storage': {'total_size': 140592000, 'output_files': {'is': {'is_nr_stokes': 1, 'is_file_size': 36864000, 'nr_of_is_files': 1}, 'uv': {'nr_of_uv_files': 50, 'uv_file_size': 2074560}, 'saps': [{'nr_of_uv_files': 50, 'sap_nr': 0}, {'sap_nr': 0, 'nr_of_is_files': 1}]}}}}}, "OK" elif servicename == 'SSDBService.GetActiveGroupNames': return {0:'storagenodes', 1:'computenodes', 2:'archivenodes', 3:'locusnodes', 4:'cep4'}, "OK" elif servicename == 'SSDBService.GetHostForGID': @@ -57,14 +63,25 @@ with patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC', a class ResourceAssignerTest(unittest.TestCase): '''Test the logic in the ResourceAssigner''' - def testResourceAssigner(self): + #def test_doAssignment(self): + #with ResourceAssigner() as assigner: + ##define inputs + #specification_tree={u'task_type': u'pipeline', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-03-25 14:15:37', u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', u'Observation.momID': u'351557', u'Observation.startTime': u'2016-03-25 14:14:57', u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_subtype': u'long baseline pipeline', u'state': u'prescheduled', u'otdb_id': 1290494, u'predecessors': [{u'task_subtype': u'averaging pipeline', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-03-25 13:51:05', u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', u'Version.number': u'33774', u'Observation.momID': u'351556', u'Observation.startTime': u'2016-03-25 13:49:55', u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_type': u'pipeline', u'otdb_id': 1290496, u'predecessors': [{u'task_subtype': u'bfmeasurement', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', u'Observation.stopTime': u'2016-03-26 00:33:31', u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508', u'RS106'], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I', u'Observation.Beam[0].subbandList': [100, 101, 102, 103], u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512', u'Observation.DataProducts.Input_Correlated.skip': [], u'Observation.antennaSet': u'HBA_DUAL', u'Observation.nrBitsPerSample': u'8', u'Observation.Beam[0].nrTabRings': u'0', u'Observation.Beam[0].nrTiedArrayBeams': u'0', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False, u'Observation.nrBeams': u'1', u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': False, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1', u'Observation.momID': u'351539', u'Observation.startTime': u'2016-03-26 00:31:31', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_type': u'observation', u'otdb_id': 1290476, u'predecessors': []}]}]} + + ##test the main assignment method + #assigner.doAssignment(specification_tree) + + ##TODO: added test asserts etc + + def test_claimResources(self): with ResourceAssigner() as assigner: #define inputs - sasId='431140' - parsets={u'431140': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-02-14 21:30:00', u'Observation.VirtualInstrument.stationList': [u'CS005', u'CS001', u'CS011', u'CS401', u'CS002', u'CS007', u'CS201', u'CS032', u'CS003', u'CS101', u'CS028', u'CS017', u'CS024', u'CS103', u'CS026', u'CS501', u'CS031', u'CS301', u'CS030', u'CS302', u'CS004', u'CS006', u'CS021'], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': True, u'Task.type': u'Observation', u'Observation.Beam[0].subbandList': [51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450], u'Observation.DataProducts.Input_Correlated.skip': [], u'Observation.antennaSet': u'LBA_OUTER', u'Observation.nrBitsPerSample': u'8', u'Observation.Beam[0].nrTabRings': u'0', u'Version.number': u'33385', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': False, u'Observation.Beam[0].TiedArrayBeam[0].coherent': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Task.subtype': u'BFMeasurement', u'Observation.momID': u'634163', u'Observation.startTime': u'2016-02-14 20:00:00', u'Observation.nrBeams': u'1', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': False, u'Observation.sampleClock': u'200'}} + estimator_output,_=assigner.rerpc() + needed_resources=estimator_output['1290472'] + task = assigner.radbrpc.getTask(1290472) - #test the main assignment method - assigner.doAssignment(sasId, parsets) + #test claimResources method + assigner.claimResources(needed_resources, task) #TODO: added test asserts etc