#!/usr/bin/env python # Copyright (C) 2015-2017 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # # $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $ """ ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset. """ import logging from datetime import datetime, timedelta import time from lofar.common.util import humanreadablesize from lofar.messaging.messages import EventMessage from lofar.messaging.messagebus import ToBus from lofar.messaging.RPC import RPC, RPCException from lofar.parameterset import parameterset from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME logger = logging.getLogger(__name__) class ResourceAssigner(): def __init__(self, radb_busname=RADB_BUSNAME, radb_servicename=RADB_SERVICENAME, re_busname=RE_BUSNAME, re_servicename=RE_SERVICENAME, otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, otdb_servicename=DEFAULT_OTDB_SERVICENAME, storagequery_busname=DEFAULT_STORAGEQUERY_BUSNAME, storagequery_servicename=DEFAULT_STORAGEQUERY_SERVICENAME, cleanup_busname=DEFAULT_CLEANUP_BUSNAME, cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME, ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, broker=None): """ ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset. :param radb_busname: busname on which the radb service listens (default: lofar.ra.command) :param radb_servicename: servicename of the radb service (default: RADBService) :param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command) :param re_servicename: servicename of the resource estimator service (default: ResourceEstimation) :param broker: Valid Qpid broker host (default: None, which means localhost) """ self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180) self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180) self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180) self.sqrpc = StorageQueryRPC(busname=storagequery_busname, servicename=storagequery_servicename, broker=broker) self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) self.ra_notification_prefix = ra_notification_prefix def __enter__(self): """Internal use only. (handles scope 'with')""" self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): """Internal use only. (handles scope 'with')""" self.close() def open(self): """Open rpc connections to radb service and resource estimator service""" self.radbrpc.open() self.rerpc.open() self.otdbrpc.open() self.momrpc.open() self.sqrpc.open() self.curpc.open() self.ra_notification_bus.open() def close(self): """Close rpc connections to radb service and resource estimator service""" self.radbrpc.close() self.rerpc.close() self.otdbrpc.close() self.momrpc.close() self.sqrpc.close() self.curpc.close() self.ra_notification_bus.close() def doAssignment(self, specification_tree): logger.info('doAssignment: specification_tree=%s' % (specification_tree)) otdb_id = specification_tree['otdb_id'] status = specification_tree.get('state', '').lower() if status not in ['approved', 'prescheduled']: # For approved we only do a few checks and put it in the RADB logger.warn('skipping specification for task otdb_id=%s, because status=%s (not prescheduled)', otdb_id, status) return try: mainParset = parameterset(specification_tree['specification']) except Exception as e: logger.error(str(e)) return # Only assign resources for task output to known clusters try: clusterNameSet = self.getClusterNames(mainParset) if str() in clusterNameSet or len(clusterNameSet) != 1: # Empty set or name is always an error. # TODO: To support >1 cluster per obs, # self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name # Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster raise Exception('clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet) clusterName = clusterNameSet.pop() # Retrieve known cluster names (not all may be a valid storage target, but we cannot know...) knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')} logger.info('known clusters: %s', knownClusterSet) if clusterName not in knownClusterSet: raise Exception('skipping resource assignment for task with cluster name \'%s\' not in known clusters %s' % (clusterName, knownClusterSet)) except Exception as e: logger.error(str(e)) return def applySaneStartEndTime(): startTime = datetime.utcnow() + timedelta(minutes=1) maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree) if maxPredecessorEndTime is not None and maxPredecessorEndTime > startTime: startTime = maxPredecessorEndTime + timedelta(minutes=1) taskDuration = mainParset.getInt('Observation.Scheduler.taskDuration') # no default passed on purpose taskDuration = timedelta(seconds=taskDuration) if taskDuration > 0 else timedelta(hours=1) endTime = startTime + taskDuration logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', startTime, endTime, otdb_id) logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id) self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'), 'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')}) return startTime, endTime # TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. Fix the cause! Idem for MoM fix up below. try: startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S') endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') if startTime < datetime.utcnow(): raise ValueError('parsed startTime (UTC) lies in the past (or large system clock offset)') # same exc type as strptime except ValueError: logger.warning('cannot parse start/stop time from specification for otdb_id=%s. Searching for sane defaults...', otdb_id) try: startTime, endTime = applySaneStartEndTime() if startTime < datetime.utcnow(): raise Exception('"sane" start/stop time turned out to be insane') except Exception as e: logger.error(str(e)) return try: # fix for MoM bug introduced before NV's holiday # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4 # so, override it here if needed, and update to otdb processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '') if processingClusterName != clusterName: logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', processingClusterName, clusterName, otdb_id) self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName }) except Exception as e: logger.error(str(e)) return logger.info('doAssignment: Accepted specification') # insert new task and specification in the radb # any existing specification and task with same otdb_id will be deleted automatically momId = mainParset.getInt('Observation.momID', -1) taskType = specification_tree.get('task_type', '') # i.e. u'observation' or u'pipeline logger.info('doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s cluster=%s' % (momId, otdb_id, status, taskType, startTime, endTime, clusterName)) try: result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType, startTime, endTime, str(mainParset), clusterName) except Exception as e: logger.error(str(e)) result = dict() # handled next if not result['inserted']: logger.error('could not insert specification and task: result = %s', result) return specificationId = result['specification_id'] taskId = result['task_id'] logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId, taskId)) if status != 'prescheduled': # should only happen for approved logger.info('skipping resource assignment for task otdb_id=%s, because status=%s' % (otdb_id, status)) return task = None try: task = self.radbrpc.getTask(taskId) except Exception as e: logger.error(str(e)) return logger.info('doAssignment: task=%s', task) # From now on we can and have to _sendStateChange() to 'error' or otherwise on unrecov errors errStatus = 'error' try: self.processPredecessors(task) self.processSuccessors(task) except Exception as e: logger.exception(str(e)) self._sendStateChange(task, errStatus) return # Request estimates of needed resources from Resource Estimator. Check reply. try: reReply, rerpcStatus = self.rerpc({"specification_tree" : specification_tree}, timeout=10) logger.info('doAssignment: Resource Estimator reply = %s', reReply) if str(otdb_id) not in reReply: raise Exception("no otdb_id %s found in estimator results %s" % (otdb_id, reReply)) estimates = reReply[str(otdb_id)] if taskType not in estimates: raise Exception("no task type %s found in estimator results %s" % (taskType, estimates)) estimates = estimates[taskType] if 'errors' in estimates and estimates['errors']: for error in estimates['errors']: logger.error("Error from Resource Estimator: %s", error) raise Exception("Error(s) in estimator for otdb_id=%s radb_id=%s" % (otdb_id, taskId)) if 'estimates' not in estimates or any('resource_types' not in est for est in estimates['estimates']): raise Exception("missing 'resource_types' in 'estimates' in estimator results: %s" % estimates) estimates = estimates['estimates'] if not all(est_val > 0 for est in estimates for est_val in est['resource_types'].values()): # Avoid div by 0 and inf looping from estimate < 0 later on. raise Exception("at least one of the estimates is not a positive number") except Exception as e: logger.error(str(e)) self._sendStateChange(task, errStatus) return # get resources and related info from radb try: db_resource_group_mships = self.radbrpc.getResourceGroupMemberships() db_rgp2rgp = db_resource_group_mships['groups'] # resource-group-to-resource-group relations # resource-to-resource-group relations are under db_resource_group_mships['resources'] db_resource_list = self.radbrpc.getResources(include_availability=True) db_resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()} db_resource_prop_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()} db_resource_max_fill_ratios = self.radbrpc.getResourceAllocationConfig(sql_like_name_pattern='max_fill_ratio_%') except Exception as e: logger.error(str(e)) self._sendStateChange(task, errStatus) return self.applyMaxFillRatios(db_resource_list, db_rgp2rgp, db_resource_types, db_resource_max_fill_ratios) # Assume estimates are (close enough to) accurate to determine resource claims for this task. # Try to get a set of non-conflicting claims from availability info in the RA DB. # If that fails, insert no claims and set the task straight to conflict. # Also, inserted claims are still automatically validated, there can be a race. # If not enough resources are available after all, claims are put to conflict status. # If any claim is in conflict state, then the task is put to conflict status as well. claims = self.getClaimsForTask(task, estimates, db_resource_list, db_rgp2rgp, db_resource_types, db_resource_prop_types) if claims is None: self._sendStateChange(task, 'error') return if claims: logger.info('doAssignment: inserting %d claims in the radb: %s' % (len(claims), claims)) try: claims = self.__insert_rcu_specifications_and_add_rcu_id_references_to_claims(claims) claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids'] logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids)) if len(claim_ids) != len(claims): raise Exception('doAssignment: too few claims were inserted in the radb') conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict') if conflictingClaims: # Will do _sendStateChange(task, 'conflict'), and radb sets task status to conflict automatically raise Exception('doAssignment: Task cannot be scheduled, because of %d conflicting claims: %s' % (len(conflictingClaims), conflictingClaims)) logger.info('doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,)) self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated') except Exception as e: logger.error(str(e)) self._sendStateChange(task, 'conflict') return # remove any output and/or intermediate data for restarting pipelines if task['type'] == 'pipeline': try: du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True) if du_result['found'] and du_result.get('disk_usage', 0) > 0: logger.info("removing data on disk from previous run for otdb_id %s", otdb_id) result = self.curpc.removeTaskData(task['otdb_id']) if not result['deleted']: logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message']) except Exception as e: logger.error(str(e)) # in line with failure as warning just above: allow going to scheduled state here too # send notification that the task was scheduled self._sendStateChange(task, 'scheduled') def __insert_rcu_specifications_and_add_rcu_id_references_to_claims(self, claims): """ Insert RCU specifications for claims that require it and construct a list of references to them :param claims: :return: claims with updated claim['rcu_id'] field """ rcu_ids = [] rcu_bit_patterns = [c['rcu_bit_pattern'] for c in claims if c['rcu_bit_pattern'] is not None] _rcu_ids = self.radbrpc.insertRcuSpecifications(rcu_bit_patterns) _rcu_ids_idx = 0 for c in claims: # TODO: Check if 'None' will become 'NULL' in the RADB? c['rcu_id'] = _rcu_ids[_rcu_ids_idx] if c['rcu_bit_pattern'] is not None else None _rcu_ids_idx += 1 return claims def _sendStateChange(self, task, status): if status == 'scheduled' or status == 'conflict' or status == 'error': content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']} subject= 'Task' + status[0].upper() + status[1:] else: # should not end up here (bug) logger.error('_sendStateChange(): bug: Not sending notification as status is %s' % status) return try: if status != 'scheduled': # another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb self.radbrpc.updateTask(task['id'], task_status=status) msg = EventMessage(context=self.ra_notification_prefix + subject, content=content) logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) self.ra_notification_bus.send(msg) except Exception as e: logger.error(str(e)) def processPredecessors(self, task): mom_id = task['mom_id'] predecessor_ids = self.momrpc.getPredecessorIds(mom_id) if str(mom_id) not in predecessor_ids or not predecessor_ids[str(mom_id)]: logger.info('no predecessors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) return predecessor_mom_ids = predecessor_ids[str(mom_id)] logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id']) for predecessor_mom_id in predecessor_mom_ids: # check if the predecessor needs to be linked to this task predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id) if predecessor_task: if predecessor_task['id'] not in task['predecessor_ids']: logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], task['otdb_id']) self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id']) else: logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', predecessor_mom_id, task['otdb_id']) def processSuccessors(self, task): mom_id = task['mom_id'] successor_ids = self.momrpc.getSuccessorIds(mom_id) if str(mom_id) not in successor_ids or not successor_ids[str(mom_id)]: logger.info('no successors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) return successor_mom_ids = successor_ids[str(mom_id)] logger.info('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id']) for successor_mom_id in successor_mom_ids: # check if the successor needs to be linked to this task successor_task = self.radbrpc.getTask(mom_id=successor_mom_id) if successor_task: if successor_task['id'] not in task['successor_ids']: logger.info('connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], task['otdb_id']) self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) movePipelineAfterItsPredecessors(successor_task, self.radbrpc) else: logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', successor_mom_id, task['otdb_id']) def getMaxPredecessorEndTime(self, specification_tree): predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']] predecessor_endTimes = [datetime.strptime(spec.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') for spec in predecessor_specs] if predecessor_endTimes: return max(predecessor_endTimes) return None def getClusterNames(self, parset): """ Return set of storage cluster names for all enabled output data product types in parset, or raise for an enabled output data product type without storage cluster name. """ clusterNames = set() keys = ['Output_Correlated', 'Output_IncoherentStokes', 'Output_CoherentStokes', 'Output_InstrumentModel', 'Output_SkyImage', 'Output_Pulsar'] for key in keys: if parset.getBool('Observation.DataProducts.%s.enabled' % key, False): name = parset.getString('Observation.DataProducts.%s.storageClusterName' % key) # may raise; don't pass default arg clusterNames.add(name) return clusterNames def applyMaxFillRatios(self, db_resource_list, db_rgp2rgp, db_resource_types, db_resource_max_fill_ratios): ''' Applies db_resource_max_fill_ratios to db_resource_list. db_resource_max_fill_ratios is e.g. [{'name': max_fill_ratio_CEP4_storage, 'value': 0.85}, ...] ''' prefix = 'max_fill_ratio_' # + resource_group_name + '_' + resource_type_name for ratio_dict in db_resource_max_fill_ratios: for res_type_name in db_resource_types: if not ratio_dict['name'].endswith('_' + res_type_name): continue res_type_id = db_resource_types[res_type_name] res_group_name = ratio_dict['name'][len(prefix) : -len(res_type_name)-1] res_group_id = self.getResourceGroupIdByName(db_rgp2rgp, res_group_name) if res_group_id is None: logger.warn('applyMaxFillRatios: could not find resource group for %s', ratio_dict['name']) break res_group = db_rgp2rgp[res_group_id] for res_id in res_group['resource_ids']: res = db_resource_list[res_id] if res['type_id'] != res_type_id: continue try: ratio = float(ratio_dict['value']) if ratio < 0.0 or ratio > 1.0: raise ValueError('value not a float in range [0.0, 1.0]') except ValueError as err: logger.error('applyMaxFillRatios: %s = %s: %s', ratio_dict['name'], ratio_dict['value'], str(err)) break res['available_capacity'] = min(res['available_capacity'], int(ratio * res['total_capacity'])) logger.info('applyMaxFillRatios: applied %s = %f', ratio_dict['name'], ratio) def getClaimsForTask(self, task, needed_resources_list, db_resource_list, db_rgp2rgp, db_resource_types, db_resource_prop_types): """ Return claims that satisfy needed_resources_list within db_resource_list, or an empty claim list if no non-conflicting claims could be found, or None on error. :param task: an instance of an RADB task object :param needed_resources_list: a list of resources to be claimed :param db_resource_list: all resources in RADB with availability information :param db_rgp2rgp: all group->group relations from RADB :param db_resource_types: all virtual instrument resource types (and their units) from RADB :param db_resource_prop_types: all resource claim property types from RADB :returns claims """ # This function selects resources for a task (i.e. obs or pipeline). Keep it side-effect free! # Criteria: # * It all fits within max fill ratio per resource group (or consider failed) # * Avoid resources marked as unavailable # * At most one claim per resource; i.e. merge where possible (DB friendly) # * Most pipelines reduce data size. Observation output is relatively large, # so spread across as many storage areas as possible (if storage is needed). # * Only completely fill a (e.g. storage) resource when inevitable, as this also makes other # resources (e.g. write bandwidth) unavailable for future tasks until clean up. # * All 4 data products of complex voltage obs data (XXYY) go to a single (storage) resource. # # Parts of these criteria may be solved by the caller, e.g. by passing filtered arguments. # # Note: certain (obs) settings are more flexible than ever used (e.g. sb list per SAP for CS/IS). # We must support the full gamut of settings, but for scenarios that are complex *and* constrained, # producing 'conflict' may be ok, if this saves implementation or excessive computational complexity. # Users will then have to free more resources than strictly necessary before retrying. logger.debug('getClaimsForTask: db_rgp2rgp: %s', db_rgp2rgp) # big! logger.debug('getClaimsForTask: db_resource_list: %s', db_resource_list) # big! logger.debug('getClaimsForTask: db_resource_types: %s', db_resource_types) logger.debug('getClaimsForTask: db_resource_prop_types: %s', db_resource_prop_types) dtypes = {'uv', 'cs', 'is', 'im', 'img', 'pulp'} summablePropTypeIds = {db_resource_prop_types['nr_of_' + dt + '_files'] for dt in dtypes} db_storage_type_id = db_resource_types['storage'] # for sorting and to attach properties db_rcu_type_id = db_resource_types['rcu'] # for attaching rcu bit field to the claim claims = [] for needed_resources in needed_resources_list: # Replace resource names by type ids: easy matching w/ other data structs needed_resources_by_type_id = {db_resource_types[name]: needed_resources['resource_types'][name] \ for name in needed_resources['resource_types']} # e.g. {3: 16536, 5: 170016} logger.info('getClaimsForTask: needed_resources_by_type_id: %s', needed_resources_by_type_id) # Find group id ('gid') of needed_resources['root_resource_group'], # then get list of claimable resources at root_gid and its children root_gid = self.getResourceGroupIdByName(db_rgp2rgp, needed_resources['root_resource_group']) if root_gid is None: logger.error('getClaimsForTask: cannot find resources to claim: unknown root_resource_group \'%s\'', needed_root_resource_group_name) return None claimable_resources_list = self.getSubtreeResourcesList(root_gid, needed_resources_by_type_id, db_resource_list, db_rgp2rgp) # e.g. [{3: <resource_dict>, 5: <resource_dict>}, ...] logger.info('getClaimsForTask: considering %d claimable resource dict(s)', len(claimable_resources_list)) logger.debug('getClaimsForTask: claimable_resources_list: %s', claimable_resources_list) input_files = needed_resources.get('input_files') output_files = needed_resources.get('output_files') properties = self.getProperties(db_resource_prop_types, input_files, 'input') properties.extend(self.getProperties(db_resource_prop_types, output_files, 'output')) # Collapse needed resources if only 1 claimable resource dict, e.g. global filesystem if len(claimable_resources_list) == 1: logger.info('getClaimsForTask: collapsing needed_resources') for type_id in needed_resources_by_type_id: needed_resources_by_type_id[type_id] *= needed_resources['resource_count'] for prop in properties: if prop['type'] in summablePropTypeIds: prop['value'] *= needed_resources['resource_count'] needed_resources['resource_count'] = 1 if db_storage_type_id in needed_resources_by_type_id: sort_res_type = db_storage_type_id else: sort_res_type = needed_resources_by_type_id.keys()[0] # some other if not storage for _ in xrange(needed_resources['resource_count']): # Sorting on every change may be slow, but for 100s of DPs, insertion of merged claims is still 3-5x slower. # A heapq was not faster, yet solving the lack of total ordering more elaborate. # Of course, big-O complexity here is terrible, but we are nowhere near (too) big. claimable_resources_list.sort(key=lambda res: res[sort_res_type]['available_capacity'], reverse=True) # Almost always iterates once. Still needed to match >1 resource types. claim = None for claimable_resources_dict in claimable_resources_list: if self.isClaimable(needed_resources_by_type_id, claimable_resources_dict): claim = self.makeClaim(db_resource_prop_types, db_storage_type_id, db_rcu_type_id, task, properties, needed_resources_by_type_id, claimable_resources_dict) logger.debug('getClaimsForTask: created claim: %s', claim) claims.extend(claim) break if claim is None: logger.warn('getClaimsForTask: Failed to find enough claimable resources for %d x %s', needed_resources['resource_count'], needed_resources_by_type_id) return {} self.mergeClaims(summablePropTypeIds, claims) return claims def getResourceGroupIdByName(self, db_rgp2rgp, name): """ Returns group id of resource group named name, or None if name was not found. The search happens breadth-first. """ gids = [0] # root group id 0 i = 0 while i < len(gids): # careful iterating while modifying res_group = db_rgp2rgp[gids[i]] if res_group['resource_group_name'] == name: return gids[i] gids.extend(res_group['child_ids']) i += 1 return None def getSubtreeResourcesList(self, root_gid, needed_resources_by_type_id, db_resource_list, db_rgp2rgp): """ Returns list of available resources of type id in needed_resources_by_type_id.keys() starting at group id root_gid in the format [{type_id: {<resource_dict>}, ...}, ...]. """ # Search breadth-first starting at root_gid. gids = [root_gid] resources_list = [] i = 0 while i < len(gids): # careful iterating while modifying resources = {} type_ids_seen = set() res_group = db_rgp2rgp[gids[i]] for rid in res_group['resource_ids']: type_id = db_resource_list[rid]['type_id'] if type_id in needed_resources_by_type_id and db_resource_list[rid]['active'] and \ db_resource_list[rid]['available_capacity'] > 0: resources[type_id] = db_resource_list[rid] type_ids_seen.add(type_id) # Only add resource IDs if all needed types are present in this resource group if type_ids_seen == set(needed_resources_by_type_id): resources_list.append(resources) gids.extend(res_group['child_ids']) i += 1 return resources_list def isClaimable(self, needed_resources, claimable_resources): """ Returns whether all needed_resources can be claimed from claimable_resources. Format needed_resources: {resource_type_id: size, ...} Format claimable_resources: {resource_type_id: {<resource_dict>}, ...} """ return all(claim_size <= claimable_resources[res_type]['available_capacity'] \ for res_type, claim_size in needed_resources.items()) def makeClaim(self, db_resource_prop_types, db_storage_type_id, db_rcu_type_id, task, properties, needed_resources, claimable_resources): """ Returns list of claims for a data product (one for each needed resource type). Note: this function also updates claimable_resources. Format needed_resources: {resource_type_id: size, ...} Format claimable_resources: {resource_type_id: {<resource_dict>}, ...} """ claims = [] for res_type, claim_size in needed_resources.items(): # RCU claim size as returned by the ResourceEstimator is actually a bit pattern (encoding which of a # station's RCUs are requested to take part in a measurement and which not). In order to have it countable # (as is expected of a claim size) it needs to be replaced with the number of RCUs requested. Subsequently, # the bit pattern information is stored with the claim separately if res_type == db_rcu_type_id: rcu_bit_pattern = needed_resources[db_rcu_type_id] # TODO: determine if .copy() is needed here claim_size = rcu_bit_pattern.count('1') claim['rcu_bit_pattern'] = rcu_bit_pattern claim = {'starttime': task['starttime'], 'endtime': task['endtime'], 'status': 'claimed'} claim['resource_id'] = claimable_resources[res_type]['id'] claim['claim_size'] = claim_size claimable_resources[res_type]['available_capacity'] -= claim_size if res_type == db_storage_type_id: # FIXME: find proper way to extend storage time with a year # 2016-09-27 scisup would like to be involved in chosing these kind of defaults # and what to do after the claim expires # we now choose a default period of a year, and do nothing if the claim expires claim['endtime'] += timedelta(days=365) if properties: claim['properties'] = properties claims.append(claim) return claims def getProperties(self, db_resource_prop_types, files_dict, io_type): """ Return list of properties in claim format converted from files_dict. """ if files_dict is None: return [] logger.info('getProperties: processing %s_files: %s', io_type, files_dict) properties = [] for group_name, needed_prop_group in files_dict.items(): # e.g. 'cs', {'...': 123, ...} or 'saps', [{...}, {...}] if group_name == 'saps': for sap_dict in needed_prop_group: props = self.makeProperties(db_resource_prop_types, sap_dict['properties'], sap_dict['sap_nr'], io_type) properties.extend(props) else: props = self.makeProperties(db_resource_prop_types, needed_prop_group, None, io_type) properties.extend(props) return properties def makeProperties(self, db_resource_prop_types, properties_dict, sap_nr, io_type): """ helper for getProperties() """ properties = [] for prop_type_name, prop_value in properties_dict.items(): rc_property_type_id = db_resource_prop_types.get(prop_type_name) if rc_property_type_id is None: logger.warn('makeProperties: unknown prop_type: %s', prop_type_name) continue prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type} if sap_nr is not None: prop['sap_nr'] = sap_nr properties.append(prop) return properties # TODO: check and decide if we need to be able to merge claims based on claim[rcu_bit_pattern] def mergeClaims(self, summablePropTypeIds, claims): """ Merge claims allocated onto the same resources. To merge claim properties, summablePropTypeIds is used. """ logger.info('mergeClaims: merging claims for the same resource across %d claims', len(claims)) claims.sort( key=lambda claim: (claim['resource_id'], claim.get('properties')) ) i = 1 while i < len(claims): # careful iterating while modifying if claims[i-1]['resource_id'] == claims[i]['resource_id']: # Merge claim_size and props; starttime and endtime are always equal for the same resource_id claims[i-1]['claim_size'] += claims[i]['claim_size'] if 'properties' in claims[i] and len(claims[i]['properties']): if 'properties' not in claims[i-1]: claims[i-1]['properties'] = [] else: # shallow copy to avoid aliasing; mergeResourceProperties() does the rest claims[i-1]['properties'] = list(claims[i-1]['properties']) self.mergeResourceProperties(summablePropTypeIds, claims[i-1]['properties'], claims[i]['properties']) claims.pop(i) # can be more efficient O()-wise else: i += 1 def mergeResourceProperties(self, summablePropTypeIds, props0, props1): """ Ensure props0 contains all properties of props1. A property of type in summablePropTypeIds must have its value added. NOTE: caller has to ensure that 'not (props0 is props1)' holds. """ # Better datastructs could easily avoid this O(N^2). len(props) is ~5 props0len = len(props0) for p1 in props1: found = False i = 0 while i < props0len: # careful iterating while modifying p0 = props0[i] if p0['type'] == p1['type'] and \ p0.get('sap_nr') == p1.get('sap_nr') and \ p0['io_type'] == p1['io_type']: if p0['type'] in summablePropTypeIds: # same and need to sum values props0[i] = dict(props0[i]) # copy to avoid changing p1 too if refers to same obj props0[i]['value'] += p1['value'] elif p0['value'] != p1['value']: logger.warn('mergeResourceProperties: unexpected same prop pair, but with different values: %s and %s', p0, p1) found = True break i += 1 if not found: props0.append(p1)