diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index cf6c56896e36ed184248d8a09cbd024b55a674eb..64caf997b00f6f17c305ce797662babdadb56085 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -130,54 +130,74 @@ class ResourceAssigner(): logger.info('doAssignment: specification_tree=%s' % (specification_tree)) otdb_id = specification_tree['otdb_id'] - taskType = specification_tree.get('task_type', '').lower() status = specification_tree.get('state', '').lower() - - if status not in ['approved', 'prescheduled']: # cep2 accepts both, cep4 only prescheduled, see below - logger.info('skipping specification for otdb_id=%s because status=%s', (otdb_id, status)) + if status != 'prescheduled': + logger.warn('skipping specification for task otdb_id=%s, because status=%s (not prescheduled)', otdb_id, status) return - #parse main parset... - mainParset = parameterset(specification_tree['specification']) + try: + mainParset = parameterset(specification_tree['specification']) + except Exception as e: + logger.error(str(e)) + return - momId = mainParset.getInt('Observation.momID', -1) + # Only assign resources for task output to known clusters + try: + clusterNameSet = self.getClusterNames(mainParset) + if str() in clusterNameSet or len(clusterNameSet) != 1: + # Empty set or name is always an error. + # TODO: To support >1 cluster per obs, + # self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name + # Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster + raise Exception('clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet) + clusterName = clusterNameSet.pop() + + # Retrieve known cluster names (not all may be a valid storage target, but we cannot know...) + knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')} + logger.info('known clusters: %s', knownClusterSet) + if clusterName not in knownClusterSet: + raise Exception('skipping resource assignment for task with cluster name %s not in known clusters %s' % (clusterName, knownClusterSet)) + except Exception as e: + logger.error(str(e)) + return - clusterIsCEP4 = self.checkClusterIsCEP4(mainParset) - clusterName = 'CEP4' if clusterIsCEP4 else 'CEP2' def applySaneStartEndTime(): - if clusterIsCEP4: - startTime = datetime.utcnow() + timedelta(minutes=1) + startTime = datetime.utcnow() + timedelta(minutes=1) - maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree) - if maxPredecessorEndTime and maxPredecessorEndTime > startTime: - startTime = maxPredecessorEndTime + timedelta(minutes=1) + maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree) + if maxPredecessorEndTime is not None and maxPredecessorEndTime > startTime: + startTime = maxPredecessorEndTime + timedelta(minutes=1) - taskDuration = mainParset.getInt('Observation.Scheduler.taskDuration', -1) - taskDuration = timedelta(seconds=taskDuration) if taskDuration > 0 else timedelta(hours=1) + taskDuration = mainParset.getInt('Observation.Scheduler.taskDuration') # no default passed on purpose + taskDuration = timedelta(seconds=taskDuration) if taskDuration > 0 else timedelta(hours=1) - endTime = startTime + taskDuration + endTime = startTime + taskDuration - logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', - startTime, endTime, otdb_id) + logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', + startTime, endTime, otdb_id) - try: - logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id) - self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'), - 'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')}) - except Exception as e: - logger.error(e) + logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id) + self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'), + 'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')}) + return startTime, endTime - return startTime, endTime + # TODO: don't fix crap. Bad start/stop time has to go to error, like any other bad spec part. Fix the cause! Idem for MoM fix up below. try: startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S') endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') - if clusterIsCEP4 and startTime < datetime.utcnow(): - startTime, endTime = applySaneStartEndTime() + if startTime < datetime.utcnow(): + raise ValueError('parsed startTime (UTC) lies in the past (or large system clock offset)') # same exc type as strptime except ValueError: - logger.warning('cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', otdb_id) - startTime, endTime = applySaneStartEndTime() + logger.warning('cannot parse start/stop time from specification for otdb_id=%s. Searching for sane defaults...', otdb_id) + try: + startTime, endTime = applySaneStartEndTime() + if startTime < datetime.utcnow(): + raise Exception('"sane" start/stop time turned out to be insane') + except Exception as e: + logger.error(str(e)) + return try: # fix for MoM bug introduced before NV's holiday @@ -186,180 +206,217 @@ class ResourceAssigner(): processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '') if processingClusterName != clusterName: logger.info('overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s', - processingClusterName, clusterName, otdb_id) + processingClusterName, clusterName, otdb_id) self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName }) except Exception as e: - logger.error(e) + logger.error(str(e)) + return + + logger.info('doAssignment: Accepted specification') # insert new task and specification in the radb # any existing specification and task with same otdb_id will be deleted automatically + momId = mainParset.getInt('Observation.momID', -1) + taskType = specification_tree.get('task_type', '') # i.e. u'observation' or u'pipeline logger.info('doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s cluster=%s' % (momId, otdb_id, status, taskType, startTime, endTime, clusterName)) - result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType, startTime, endTime, str(mainParset), clusterName) - + try: + result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType, + startTime, endTime, str(mainParset), clusterName) + except Exception as e: + logger.error(str(e)) + result = dict() # handled next if not result['inserted']: logger.error('could not insert specification and task') return specificationId = result['specification_id'] taskId = result['task_id'] - logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId,taskId)) + logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId, taskId)) - task = self.radbrpc.getTask(taskId) - self.processPredecessors(task) - self.processSuccessors(task) - - # do not assign resources to task for other clusters than cep4 - if not clusterIsCEP4: + task = None + try: + task = self.radbrpc.getTask(taskId) + except Exception as e: + logger.error(str(e)) return + logger.info('doAssignment: task=%s', task) - if status != 'prescheduled': - logger.info('skipping resource assignment for CEP4 task otdb_id=%s because status=%s' % (otdb_id, status)) - return + # From now on we can and have to _sendStateChange() to 'error' or otherwise on unrecov errors + errStatus = 'error' try: - needed = self.getNeededResouces(specification_tree) - logger.info('doAssignment: getNeededResouces=%s' % (needed,)) + self.processPredecessors(task) + self.processSuccessors(task) except Exception as e: - logger.error(e) - self.radbrpc.updateTask(taskId, status='error') - self._sendNotification(task, 'error') + logger.exception(str(e)) + self._sendStateChange(task, errStatus) return - if not str(otdb_id) in needed: - logger.error("no otdb_id %s found in estimator results %s" % (otdb_id, needed)) + # Request estimates of needed resources from Resource Estimator. Check reply. + try: + reReply, rerpcStatus = self.rerpc({"specification_tree" : specification_tree}, timeout=10) + logger.info('doAssignment: Resource Estimator reply = %s', reReply) + + if not str(otdb_id) in reReply: + raise Exception("no otdb_id %s found in estimator results %s" % (otdb_id, reReply)) + estimates = reReply[str(otdb_id)] + + if not taskType in estimates: + raise Exception("no task type %s found in estimator results %s" % (taskType, estimates)) + estimates = estimates[taskType] + + if 'errors' in estimates and estimates['errors']: + for error in estimates['errors']: + logger.error("Error from Resource Estimator: %s", error) + raise Exception("Error(s) in estimator for otdb_id=%s radb_id=%s" % (otdb_id, taskId)) + + if not 'estimates' in estimates: + raise Exception("no estimates found in estimator results %s" % estimates) + estimates = estimates['estimates'] + except Exception as e: + logger.error(str(e)) + self._sendStateChange(task, errStatus) return - if not taskType in needed[str(otdb_id)]: - logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)])) + # get resources and related info from radb + try: + db_resource_group_mship = self.radbrpc.getResourceGroupMemberships() + db_rgp2rgp = db_resource_group_mship['groups'] # resource-group-to-resource-group relations + db_r2rgp = db_resource_group_mship['resources'] # resource-to-resource-group relations + + db_resource_list = self.radbrpc.getResources(include_availability=True) + db_resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()} + db_resource_prop_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()} + except Exception as e: + logger.error(str(e)) + self._sendStateChange(task, errStatus) return - # claim the resources for this task - # during the claim inserts the claims are automatically validated - # and if not enough resources are available, then they are put to conflict status - # also, if any claim is in conflict state, then the task is put to conflict status as well - main_needed = needed[str(otdb_id)] - - if 'errors' in main_needed and main_needed['errors']: - for error in main_needed['errors']: - logger.error("Error in estimator: %s", error) - - logger.error("Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to 'error'", otdb_id, taskId) - self.radbrpc.updateTask(taskId, status='error') - self._sendNotification(task, 'error') - else: - claimed, claim_ids = self.claimResources(main_needed, task) - if claimed: - conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict') + # Assume estimates are (close enough to) accurate to determine resource claims for this task. + # Try to a set of non-conflicting claims from availability info in the RA DB. + # If that fails, insert no claims and set the task straight to conflict. + # Also, inserted claims are still automatically validated, maybe there is a race. + # If not enough resources are available after all, claims are put to conflict status. + # If any claim is in conflict state, then the task is put to conflict status as well. + claims = self.getClaimsForTask(task, estimates, db_resource_list, db_rgp2rgp, db_r2rgp, db_resource_types, db_resource_prop_types) + if claims: + claim_ids = None + logger.info('doAssignment: inserting %d claims in the radb: %s' % (len(claims), claims)) + try: + claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids'] + logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids)) + if not claim_ids or len(claim_ids) != len(claims): + raise Exception('doAssignment: too few claims were inserted in the radb') + conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict') if conflictingClaims: - # radb set's task status to conflict automatically - logger.warning('doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' % - (len(conflictingClaims), conflictingClaims)) - self._sendNotification(task, 'conflict') - else: - logger.info('doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,)) - self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated') - - # remove any output and/or intermediate data for restarting CEP4 pipelines - if task['type'] == 'pipeline': - du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True) - if du_result['found'] and du_result.get('disk_usage', 0) > 0: - logger.info("removing data on disk from previous run for otdb_id %s", otdb_id) - result = self.curpc.removeTaskData(task['otdb_id']) - - if not result['deleted']: - logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message']) - - # send notification that the task was scheduled, - # another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb - self._sendNotification(task, 'scheduled') - else: - if claim_ids: - logger.warning('doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % (taskId)) - self.radbrpc.updateTask(taskId, status='conflict') - self._sendNotification(task, 'conflict') - else: - logger.warning('doAssignment: No claims could be made. Setting task %s status to error' % (taskId)) - self.radbrpc.updateTask(taskId, status='error') - self._sendNotification(task, 'error') - - def _sendNotification(self, task, status): + # Will do _sendStateChange(task, 'conflict'), and radb sets task status to conflict automatically + raise Exception('doAssignment: Task cannot be scheduled, because of %d conflicting claims: %s' % + (len(conflictingClaims), conflictingClaims)) + + logger.info('doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,)) + self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated') + except Exception as e: + logger.error(str(e)) + claims = None # handle claim insertion issues as if we could not produce valid claims + if not claims: + self._sendStateChange(task, 'conflict') + return + + # remove any output and/or intermediate data for restarting pipelines + if task['type'] == 'pipeline': + try: + du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True) + if du_result['found'] and du_result.get('disk_usage', 0) > 0: + logger.info("removing data on disk from previous run for otdb_id %s", otdb_id) + result = self.curpc.removeTaskData(task['otdb_id']) + if not result['deleted']: + logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message']) + except Exception as e: + logger.error(str(e)) # in line with failure as warning just above: allow going to scheduled state here too + + # send notification that the task was scheduled + self._sendStateChange(task, 'scheduled') + + def _sendStateChange(self, task, status): + if status == 'scheduled' or status == 'conflict' or status == 'error': + content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']} + subject= 'Task' + status[0].upper() + status[1:] + else: # should not end up here (bug) + logger.error('_sendStateChange(): bug: Not sending notification as status is %s' % status) + return + try: - if status == 'scheduled' or status == 'conflict' or status == 'error': - content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']} - subject= 'Task' + status[0].upper() + status[1:] - msg = EventMessage(context=self.ra_notification_prefix + subject, content=content) - logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) - self.ra_notification_bus.send(msg) + if status != 'scheduled': + # another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb + self.radbrpc.updateTask(task['id'], status=status) + + msg = EventMessage(context=self.ra_notification_prefix + subject, content=content) + logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) + self.ra_notification_bus.send(msg) except Exception as e: logger.error(str(e)) def processPredecessors(self, task): - try: - predecessor_mom_ids = self.momrpc.getPredecessorIds(task['mom_id'])[str(task['mom_id'])] - - if predecessor_mom_ids: - logger.info('proccessing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id']) - - for predecessor_mom_id in predecessor_mom_ids: - #check if the predecessor needs to be linked to this task - predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id) - if predecessor_task: - if predecessor_task['id'] not in task['predecessor_ids']: - logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to it\'s successor with mom_id=%s otdb_id=%s', predecessor_task['mom_id'], - predecessor_task['otdb_id'], - task['mom_id'], - task['otdb_id']) - self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id']) - else: - logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', predecessor_mom_id, task['otdb_id']) + mom_id = task['mom_id'] + + predecessor_ids = self.momrpc.getPredecessorIds(mom_id) + if str(mom_id) not in predecessor_ids or not predecessor_ids[str(mom_id)]: + logger.info('no predecessors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) + return + predecessor_mom_ids = predecessor_ids[str(mom_id)] + + logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id']) + + for predecessor_mom_id in predecessor_mom_ids: + # check if the predecessor needs to be linked to this task + predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id) + if predecessor_task: + if predecessor_task['id'] not in task['predecessor_ids']: + logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', + predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], task['otdb_id']) + self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id']) else: - logger.info('no predecessors for otdb_id=%s', task['otdb_id']) + logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', predecessor_mom_id, task['otdb_id']) - except Exception as e: - logger.error(e) def processSuccessors(self, task): - try: - successor_mom_ids = self.momrpc.getSuccessorIds(task['mom_id'])[str(task['mom_id'])] - - if successor_mom_ids: - logger.info('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id']) - - for successor_mom_id in successor_mom_ids: - #check if the successor needs to be linked to this task - successor_task = self.radbrpc.getTask(mom_id=successor_mom_id) - if successor_task: - if successor_task['id'] not in task['successor_ids']: - logger.info('connecting successor task with mom_id=%s otdb_id=%s to it\'s predecessor with mom_id=%s otdb_id=%s', successor_task['mom_id'], - successor_task['otdb_id'], - task['mom_id'], - task['otdb_id']) - self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) - movePipelineAfterItsPredecessors(successor_task, self.radbrpc) - else: - logger.warning('could not find successor task with mom_id=%s in radb for task mom_id=%s otdb_id=%s', successor_mom_id, task['mom_id'], task['otdb_id']) - else: - logger.info('no successors for otdb_id=%s', task['otdb_id']) + mom_id = task['mom_id'] - except Exception as e: - logger.error(e) + successor_ids = self.momrpc.getSuccessorIds(mom_id) + if str(mom_id) not in successor_ids or not successor_ids[str(mom_id)]: + logger.info('no successors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) + return + successor_mom_ids = successor_ids[str(mom_id)] + + logger.info('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id']) + + for successor_mom_id in successor_mom_ids: + # check if the successor needs to be linked to this task + successor_task = self.radbrpc.getTask(mom_id=successor_mom_id) + if successor_task: + if successor_task['id'] not in task['successor_ids']: + logger.info('connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', + successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], task['otdb_id']) + self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) + movePipelineAfterItsPredecessors(successor_task, self.radbrpc) + else: + logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', successor_mom_id, task['otdb_id']) def getMaxPredecessorEndTime(self, specification_tree): - try: - predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']] - predecessor_endTimes = [datetime.strptime(spec.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') for spec in predecessor_specs] - - if predecessor_endTimes: - return max(predecessor_endTimes) - except Exception as e: - logger.error(e) + predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']] + predecessor_endTimes = [datetime.strptime(spec.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') for spec in predecessor_specs] + if predecessor_endTimes: + return max(predecessor_endTimes) return None - def checkClusterIsCEP4(self, parset): - # check storageClusterName for enabled DataProducts - # if any storageClusterName is not CEP4, we do not accept this parset + def getClusterNames(self, parset): + """ Return set of storage cluster names for all enabled output data product types in parset, + or raise for an enabled output data product type without storage cluster name. + """ + clusterNames = set() + keys = ['Output_Correlated', 'Output_IncoherentStokes', 'Output_CoherentStokes', @@ -368,96 +425,187 @@ class ResourceAssigner(): 'Output_Pulsar'] for key in keys: if parset.getBool('Observation.DataProducts.%s.enabled' % key, False): - if parset.getString('Observation.DataProducts.%s.storageClusterName' % key, '') != 'CEP4': - logger.warn("storageClusterName not CEP4, rejecting specification.") - return False - - logger.info("all enabled storageClusterName's are CEP4, accepting specification.") - return True + name = parset.getString('Observation.DataProducts.%s.storageClusterName' % key) # may raise; don't pass default arg + clusterNames.add(name) + return clusterNames - def getNeededResouces(self, specification_tree): - replymessage, status = self.rerpc({"specification_tree":specification_tree}, timeout=10) - logger.info('getNeededResouces: %s' % replymessage) - return replymessage - - def claimResources(self, needed_resources, task): - logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources)) - - # get the needed resources for the task type - needed_resources_for_task_type = needed_resources[task['type']] - - # get db lists - rc_property_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()} - resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()} - resources = self.radbrpc.getResources() + def getClaimsForTask(self, task, needed_resources_list, db_resources_list, + db_rgp2rgp, db_r2rgp, db_resource_types, db_resource_prop_types): + """ Return claims that satisfy needed_resources_list within db_resources_list, + or 'not claims' if no such set of non-conflicting claims could be found. + """ + # This (for easy testing *pure*) function selects resources for task (i.e. obs or pipeline). + # Criteria: + # * It all fits within max fill percentage per resource group (or consider failed) + # * Avoid resources marked as unavailable + # * At most one claim per resource; i.e. merge where possible (DB friendly) + # * Only completely fill a storage resource when inevitable, as this also makes other + # resources (e.g. write bandwidth) unavailable for future tasks until clean up. + # * For storage, try to use all nodes, but if nodes have >1 storage area, + # prefer all claims on fewer resources per node. Both benefit subseq pipeline (I/O). + # * All 4 data products of complex voltage obs data (XXYY) go to a single (storage) resource + # (but no extra work here, because the Resource Estimator provides a single estimate). + # + # Note: certain (obs) settings are more flexible than ever used (e.g. sb list per SAP wrt CS/IS). + # We must support the full gamut of settings, but for scenarios that are complex *and* constrained, + # producing 'conflict' may be ok, if this saves implementation or excessive computational complexity. + # Users will then have to free more resources than strictly necessary before retrying. + # + # Strategy to fulfill criteria above: + # 1. Do a rough check on totals. Reject clearly impossible requests. + # 2. Try to produce a set of claims using all available resources + # Specifically for resource type 'bandwidth', if its name matches another resource name + # at the same resource group, allocate together (but do allow bandwidth-only claims). + # 3. When ok, see if using fewer than all available resources is preferable + # (e.g. use only 1 storage resource per node, rather than multiple) + # 4. Merge claims on each resource into a single claim + +#TODO: measure timing, then replace O(n) lookups in large lists by dicts or sets + logger.info('getClaimsForTask: db_rgp2rgp: %s', db_rgp2rgp) + logger.info('getClaimsForTask: db_r2rgp: %s', db_r2rgp) + logger.info('getClaimsForTask: db_resources_list: %s', db_resources_list) + logger.info('getClaimsForTask: db_resource_types: %s', db_resource_types) + logger.info('getClaimsForTask: db_resource_prop_types: %s', db_resource_prop_types) + + +# db_rgp2rgp: {0: {u'resource_group_id': 0, u'resource_ids': [], u'child_ids': [1, 2], u'resource_group_name': u'LOFAR', u'parent_ids': []}, 1: {u'resource_group_id': 1, u'resource_ids': [116, 117], u'child_ids': [3], u'resource_group_name': u'CEP4', u'parent_ids': [0]}, 2: {u'resource_group_id': 2, u'resource_ids': [], u'child_ids': [55, 56, 57, 58, 59, 60, 61, 62], u'resource_group_name': u'Cobalt', u'parent_ids': [0]}, ...} + +# db_r2rgp: {0: {u'resource_id': 0, u'resource_name': u'bandwidth', u'parent_group_ids': [5]}, 1: {u'resource_id': 1, u'resource_name': u'processor', u'parent_group_ids': [5]}, 2: {u'resource_id': 2, u'resource_name': u'bandwidth', u'parent_group_ids': [6]}, ...} + +# db_resources_list: [{u'total_capacity': 50000000000, u'name': u'bandwidth', u'type_id': 3, u'available_capacity': 50000000000, u'type_name': u'bandwidth', u'unit_id': 3, u'active': True, u'used_capacity': 0, u'id': 0, u'unit': u'bits/second'}, {u'total_capacity': 24, u'name': u'processor', u'type_id': 4, u'available_capacity': 24, u'type_name': u'processor', u'unit_id': 4, u'active': True, u'used_capacity': 0, u'id': 1, u'unit': u'cores'}, ...] + +# db_resource_types: {u'rcu': 2, u'storage': 5, u'bandwidth': 3, u'tbb': 1, u'rsp': 0, u'processor': 4} + +# db_resource_prop_types: {u'img_file_size': 12, u'nr_of_uv_files': 2, u'pulp_otdb_id': 21, u'is_otdb_id': 18, u'img_otdb_id': 20, u'start_sb_nr': 15, u'uv_otdb_id': 16, u'cs_otdb_id': 17, u'im_file_size': 11, u'nr_of_pulp_files': 13, u'nr_of_tabs': 14, u'uv_file_size': 10, u'nr_of_is_files': 0, u'nr_of_img_files': 4, u'pulp_file_size': 24, u'start_sbg_nr': 23, u'is_tab_nr': 22, u'nr_of_cs_files': 1, u'nr_of_cs_stokes': 6, u'nr_of_im_files': 3, u'cs_file_size': 9, u'is_file_size': 8, u'nr_of_is_stokes': 7, u'im_otdb_id': 19} - # loop over needed_resources -> resource_type -> claim (and props) - # flatten the tree dict to a list of claims (with props) claims = [] - for resource_type_name, needed_claim_for_resource_type in needed_resources_for_task_type.items(): - if resource_type_name in resource_types: - logger.info('claimResources: processing resource_type: %s contents: %s' % (resource_type_name, needed_claim_for_resource_type)) - db_resource_type_id = resource_types[resource_type_name] - db_resources_for_type = [r for r in resources if r['type_id'] == db_resource_type_id] - - # needed_claim_for_resource_type is a dict containing exactly one kvp of which the value is an int - # that value is the value for the claim - needed_claim_value = next((v for k,v in needed_claim_for_resource_type.items() if isinstance(v, int))) - - # FIXME: right now we just pick the first resource from the 'cep4' resources. - # estimator will deliver this info in the future - db_cep4_resources_for_type = [r for r in db_resources_for_type if 'cep4' in r['name'].lower()] - - if db_cep4_resources_for_type: - claim = {'resource_id':db_cep4_resources_for_type[0]['id'], - 'starttime':task['starttime'], - 'endtime':task['endtime'], - 'status':'claimed', - 'claim_size':needed_claim_value} - - # FIXME: find proper way to extend storage time with a year - # 2016-09-27 scisup would like to be involved in chosing these kind of defaults - # and what to do after the claim expires - # we now choose a default period of a year, and do nothing if the claim expires - if 'storage' in db_cep4_resources_for_type[0]['name']: - claim['endtime'] += timedelta(days=365) - - # if the needed_claim_for_resource_type dict contains more kvp's, - # then the subdicts contains groups of properties for the claim - if len(needed_claim_for_resource_type) > 1: - claim['properties'] = [] - - def processProperties(propertiesDict, sap_nr=None, is_input=False): - for prop_type_name, prop_value in propertiesDict.items(): - if prop_type_name in rc_property_types: - rc_property_type_id = rc_property_types[prop_type_name] - property = {'type':rc_property_type_id, - 'value':prop_value, - 'io_type': 'input' if is_input else 'output'} - if sap_nr is not None: - property['sap_nr'] = sap_nr - claim['properties'].append(property) - else: - logger.warn('claimResources: unknown prop_type:%s' % prop_type_name) - - subdicts = {k:v for k,v in needed_claim_for_resource_type.items() if isinstance(v, dict)} - for subdict_name, subdict in subdicts.items(): - logger.info('claimResources: processing resource_type: %s subdict_name: \'%s\' subdict_contents: %s' % (resource_type_name, subdict_name, subdict)) - is_input = 'input' in subdict_name.lower() - for group_name, needed_prop_group in subdict.items(): - if group_name == 'saps': - for sap_dict in needed_prop_group: - processProperties(sap_dict['properties'], sap_dict['sap_nr'], is_input) - else: - processProperties(needed_prop_group, None, is_input) - - logger.info('claimResources: created claim:%s' % claim) - claims.append(claim) - else: - logger.warn('claimResources: unknown resource_type:%s' % resource_type_name) - logger.info('claimResources: inserting %d claims in the radb' % len(claims)) - claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids'] - logger.info('claimResources: %d claims were inserted in the radb' % len(claim_ids)) - return (len(claim_ids) > 0 and len(claim_ids) == len(claims)), claim_ids + db_active_resource_list = [r for r in db_resources_list if r['active']] + + for needed_res in needed_resources_list: + needed_resource_types = needed_res['resource_types'] # e.g. {'bandwidth': 123456789, 'storage': 987654321} + needed_resource_type_ids = [db_resource_types[name] for name in needed_resource_types.keys()] + db_needed_resource_list = [r for r in db_active_resource_list if r['type_id'] in needed_resource_type_ids] + db_resource_list = [] + + logger.info('getClaimsForTask: needed_resource_types: %s', needed_resource_types) + logger.info('getClaimsForTask: needed_resource_type_ids: %s', needed_resource_type_ids) + logger.info('getClaimsForTask: db_needed_resource_list: %s', db_needed_resource_list) + + # Compile list of resource group IDs whose resources we care about, + # then filter db resource list with it. +# TODO: tmp: deal with [0] only + needed_root_resource_group_name = needed_res['root_resource_groups'][0] + target_resgids = self._get_subtree_resource_group_ids(db_rgp2rgp, needed_root_resource_group_name) + db_resource_list.extend([r for r in db_needed_resource_list if db_r2rgp[r['id']]['parent_group_ids'][0] in target_resgids]) + + logger.info('getClaimsForTask: needed_root_resource_group_name: %s', needed_root_resource_group_name) + logger.info('getClaimsForTask: target_resgids: %s', target_resgids) + logger.info('getClaimsForTask: db_resource_list: %s', db_resource_list) + + # Try to find in db_resource_list: needed_res['count'] * needed_resource_types + for r in xrange(needed_res['count']): + pass + + if claims: + break + + return claims + + for resource_type_name, needed_claim_for_resource_type in needed_res.items(): +# if resource_type_name not in db_resource_types: +# logger.warn('claimResources: unknown resource_type: %s' % resource_type_name) +# continue + + logger.info('claimResources: processing resource_type: %s, contents: %s' % (resource_type_name, needed_claim_for_resource_type)) + db_resource_type_id = db_resource_types[resource_type_name] + db_resources_for_type = [r for r in resources if r['type_id'] == db_resource_type_id] + + # needed_claim_for_resource_type is a dict containing exactly one kvp of which the value is an int + # that value is the value for the claim + needed_claim_value = next((v for k,v in needed_claim_for_resource_type.items() if isinstance(v, int))) + + # FIXME: right now we just pick the first resource from the 'cep4' resources. + # estimator will deliver this info in the future + db_cep4_resources_for_type = [r for r in db_resources_for_type if 'cep4' in r['name'].lower()] + if db_cep4_resources_for_type: +# claim = getResourceClaim(db_cep4_resources_for_type) +# logger.info('claimResources: created CEP4 claim: %s' % claim) +# claims.append(claim) +# +# db_dragnet_resources_for_type = [r for r in db_resources_for_type if 'dragnet' in r['name'].lower()] +# if db_dragnet_resources_for_type: +# claim = getResourceClaim(db_dragnet_resources_for_type) +# logger.info('claimResources: created DRAGNET claim: %s' % claim) +# claims.append(claim) +# +# def getResourceClaim(db_resources_for_type): + + claim = {'resource_id':db_resources_for_type[0]['id'], + 'starttime':task['starttime'], + 'endtime':task['endtime'], + 'status':'claimed', + 'claim_size':needed_claim_value} + + # FIXME: find proper way to extend storage time with a year + # 2016-09-27 scisup would like to be involved in chosing these kind of defaults + # and what to do after the claim expires + # we now choose a default period of a year, and do nothing if the claim expires + if 'storage' in db_cep4_resources_for_type[0]['name']: + claim['endtime'] += timedelta(days=365) + + # if the needed_claim_for_resource_type dict contains more kvp's, + # then the subdicts contains groups of properties for the claim + if len(needed_claim_for_resource_type) > 1: + claim['properties'] = [] + + def processProperties(propertiesDict, sap_nr=None, is_input=False): + for prop_type_name, prop_value in propertiesDict.items(): + if prop_type_name in db_resource_prop_types: + rc_property_type_id = db_resource_prop_types[prop_type_name] + property = {'type':rc_property_type_id, + 'value':prop_value, + 'io_type': 'input' if is_input else 'output'} + if sap_nr is not None: + property['sap_nr'] = sap_nr + claim['properties'].append(property) + else: + logger.warn('claimResources: unknown prop_type:%s' % prop_type_name) + + subdicts = {k:v for k,v in needed_claim_for_resource_type.items() if isinstance(v, dict)} + for subdict_name, subdict in subdicts.items(): + logger.info('claimResources: processing resource_type: %s subdict_name: \'%s\' subdict_contents: %s' % (resource_type_name, subdict_name, subdict)) + is_input = 'input' in subdict_name.lower() + for group_name, needed_prop_group in subdict.items(): + if group_name == 'saps': + for sap_dict in needed_prop_group: + processProperties(sap_dict['properties'], sap_dict['sap_nr'], is_input) + else: + processProperties(needed_prop_group, None, is_input) + + logger.info('claimResources: created claim: %s' % claim) + claims.append(claim) + + return claims + + def _get_subtree_resource_group_ids(self, res_grp_to_res_grp_dict, root_name): + # Search breadth-first through child ids under the absolute root resource group (id 0), + # until we find root_name. Then populate the retval starting with root_name's group id. + all_gids = [0] + subtree_gids = [] + + for gid in all_gids: + res_grp = res_grp_to_res_grp_dict[gid] + logger.info('all_gids = %s; subtree_gids = %s; res_grp: %s', all_gids, subtree_gids, res_grp) + if not subtree_gids: + if res_grp['resource_group_name'] == root_name: + subtree_gids = [gid] + subtree_gids.extend(res_grp['child_ids']) + all_gids = [] + else: # root_name already found + subtree_gids.extend(res_grp['child_ids']) + all_gids.extend(res_grp['child_ids']) + + return subtree_gids +