-
Jan David Mol authoredJan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
assignment.py 44.80 KiB
#!/usr/bin/env python
# Copyright (C) 2015-2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $
"""
ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset.
"""
import logging
from datetime import datetime, timedelta
import time
from lofar.common.util import humanreadablesize
from lofar.common.cache import cache
from lofar.common.datetimeutils import totalSeconds, parseDatetime
from lofar.messaging.messages import EventMessage
from lofar.messaging.messagebus import ToBus
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset
from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC
from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME
from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME
from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME
logger = logging.getLogger(__name__)
class ResourceAssigner():
def __init__(self,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
re_busname=RE_BUSNAME,
re_servicename=RE_SERVICENAME,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME,
storagequery_busname=DEFAULT_STORAGEQUERY_BUSNAME,
storagequery_servicename=DEFAULT_STORAGEQUERY_SERVICENAME,
cleanup_busname=DEFAULT_CLEANUP_BUSNAME,
cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME,
ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME,
ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None):
"""
ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset.
:param radb_busname: busname on which the radb service listens (default: lofar.ra.command)
:param radb_servicename: servicename of the radb service (default: RADBService)
:param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command)
:param re_servicename: servicename of the resource estimator service (default: ResourceEstimation)
:param broker: Valid Qpid broker host (default: None, which means localhost)
"""
self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180)
self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180)
self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
self.sqrpc = StorageQueryRPC(busname=storagequery_busname, servicename=storagequery_servicename, broker=broker)
self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker)
self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
self.ra_notification_prefix = ra_notification_prefix
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Internal use only. (handles scope 'with')"""
self.close()
def open(self):
"""Open rpc connections to radb service and resource estimator service"""
self.radbrpc.open()
self.rerpc.open()
self.otdbrpc.open()
self.momrpc.open()
self.sqrpc.open()
self.curpc.open()
self.ra_notification_bus.open()
def close(self):
"""Close rpc connections to radb service and resource estimator service"""
self.radbrpc.close()
self.rerpc.close()
self.otdbrpc.close()
self.momrpc.close()
self.sqrpc.close()
self.curpc.close()
self.ra_notification_bus.close()
@staticmethod
def taskTypeProducesOutput( type, subtype ):
return type != 'reservation'
@property
@cache
def resource_group_relations(self):
""" Returns a dict of resource groups and their relations. Does not include resources.
Each dict element has the resource group id as key, and the following value:
{ "child_ids": list of child resource groups
"parent_ids": list of parent resource groups
"resource_ids": list of resources in this group. } """
memberships = self.radbrpc.getResourceGroupMemberships()
return memberships['groups'] # resource-group-to-resource-group relations
@property
@cache
def resource_types(self):
""" Returns a dict of all the resource types, to convert name->id. """
return {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()}
@property
@cache
def resource_claim_property_types(self):
""" Returns a dict of all the resource claim property types, to convert name->id. """
return {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()}
def doAssignment(self, specification_tree):
logger.info('doAssignment: specification_tree=%s' % (specification_tree))
otdb_id = specification_tree['otdb_id']
status = specification_tree.get('state', '').lower()
if status not in ['approved', 'prescheduled']: # For approved we only do a few checks and put it in the RADB
logger.warn('skipping specification for task otdb_id=%s, because status=%s (not prescheduled)', otdb_id, status)
return
try:
mainParset = parameterset(specification_tree['specification'])
except Exception as e:
logger.error(str(e))
return
taskType = specification_tree['task_type'] # is required item
if 'task_subtype' in specification_tree: # is optional item
taskSubtype = specification_tree['task_subtype']
else:
taskSubtype = ''
if self.taskTypeProducesOutput( taskType , taskSubtype):
# Only assign resources for task output to known clusters
try:
clusterNameSet = self.getClusterNames(mainParset)
if str() in clusterNameSet or len(clusterNameSet) != 1:
# Empty set or name is always an error.
# TODO: To support >1 cluster per obs,
# self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name
# Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster
raise Exception('clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet)
clusterName = clusterNameSet.pop()
# Retrieve known cluster names (not all may be a valid storage target, but we cannot know...)
knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')}
logger.info('known clusters: %s', knownClusterSet)
if clusterName not in knownClusterSet:
raise Exception('skipping resource assignment for task with cluster name \'%s\' not in known clusters %s' % (clusterName, knownClusterSet))
except Exception as e:
logger.error(str(e))
return
try:
# fix for MoM bug introduced before NV's holiday
# MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4
# so, override it here if needed, and update to otdb
processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '')
if processingClusterName != clusterName:
logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s',
processingClusterName, clusterName, otdb_id)
self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName })
except Exception as e:
logger.error(str(e))
return
else:
# task has no output
clusterName = ''
def applySaneStartEndTime(origStartTime, origEndTime):
startTime = datetime.utcnow() + timedelta(minutes=3)
maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree)
if maxPredecessorEndTime is not None and maxPredecessorEndTime > startTime:
startTime = maxPredecessorEndTime + timedelta(minutes=3)
taskDuration = timedelta(seconds=totalSeconds(origEndTime - origStartTime)) if origEndTime > origStartTime else timedelta(hours=1)
endTime = startTime + taskDuration
logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
startTime, endTime, otdb_id)
logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id)
self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'),
'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')})
return startTime, endTime
# TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. Fix the cause! Idem for MoM fix up below.
try:
startTime = parseDatetime(mainParset.getString('Observation.startTime'))
endTime = parseDatetime(mainParset.getString('Observation.stopTime'))
if startTime < datetime.utcnow():
startTime, endTime = applySaneStartEndTime(startTime, endTime)
except Exception as e:
logger.error(str(e))
return
logger.info('doAssignment: Accepted specification')
# insert new task and specification in the radb
# any existing specification and task with same otdb_id will be deleted automatically
momId = mainParset.getInt('Observation.momID', -1)
logger.info('doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s cluster=%s' %
(momId, otdb_id, status, taskType, startTime, endTime, clusterName))
try:
result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType,
startTime, endTime, str(mainParset), clusterName)
except Exception as e:
logger.error(str(e))
result = {'inserted': False} # handled next
if not result['inserted']:
logger.error('could not insert specification and task: result = %s', result)
return
specificationId = result['specification_id']
taskId = result['task_id']
logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId, taskId))
if status != 'prescheduled': # should only happen for approved
logger.info('skipping resource assignment for task otdb_id=%s, because status=%s' % (otdb_id, status))
return
task = None
try:
task = self.radbrpc.getTask(taskId)
except Exception as e:
logger.error(str(e))
return
logger.info('doAssignment: task=%s', task)
# From now on we can and have to _sendStateChange() to 'error' or otherwise on unrecov errors
errStatus = 'error'
try:
self.processPredecessors(task)
self.processSuccessors(task)
except Exception as e:
logger.exception(str(e))
self._sendStateChange(task, errStatus)
return
# Request estimates of needed resources from Resource Estimator. Check reply.
try:
reReply, rerpcStatus = self.rerpc({"specification_tree" : specification_tree}, timeout=10)
logger.info('doAssignment: Resource Estimator reply = %s', reReply)
if str(otdb_id) not in reReply:
raise Exception("no otdb_id %s found in estimator results %s" % (otdb_id, reReply))
estimates = reReply[str(otdb_id)]
if taskType not in estimates:
raise Exception("no task type %s found in estimator results %s" % (taskType, estimates))
estimates = estimates[taskType]
if 'errors' in estimates and estimates['errors']:
for error in estimates['errors']:
logger.error("Error from Resource Estimator: %s", error)
raise Exception("Error(s) in estimator for otdb_id=%s radb_id=%s" % (otdb_id, taskId))
if 'estimates' not in estimates or any('resource_types' not in est for est in estimates['estimates']):
raise Exception("missing 'resource_types' in 'estimates' in estimator results: %s" % estimates)
estimates = estimates['estimates']
if not all(est_val > 0 for est in estimates for est_val in est['resource_types'].values()):
# Avoid div by 0 and inf looping from estimate <= 0 later on.
raise Exception("at least one of the estimates is not a positive number")
except Exception as e:
logger.error(str(e))
self._sendStateChange(task, errStatus)
return
# get resources and related info from radb
try:
db_resource_list = self.radbrpc.getResources(include_availability=True)
db_resource_max_fill_ratios = self.radbrpc.getResourceAllocationConfig(sql_like_name_pattern='max_fill_ratio_%')
except Exception as e:
logger.error(str(e))
self._sendStateChange(task, errStatus)
return
self.applyMaxFillRatios(db_resource_list, db_resource_max_fill_ratios)
# Assume estimates are (close enough to) accurate to determine resource claims for this task.
# Try to get a set of non-conflicting claims from availability info in the RA DB.
# If that fails, insert no claims and set the task straight to conflict.
# Also, inserted claims are still automatically validated, there can be a race.
# If not enough resources are available after all, claims are put to conflict status.
# If any claim is in conflict state, then the task is put to conflict status as well.
all_fit, claims, _ = self.findClaims(estimates, db_resource_list)
if not all_fit:
self._sendStateChange(task, 'error')
return
if claims:
# Complete the claims by annotating them with task information
self.tieClaimsToTask(claims, task)
logger.info('doAssignment: inserting %d claims in the radb: %s', len(claims), claims)
try:
claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids']
logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids))
if len(claim_ids) != len(claims):
raise Exception('doAssignment: too few claims were inserted in the radb')
conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict')
if conflictingClaims:
# Will do _sendStateChange(task, 'conflict'), and radb sets task status to conflict automatically
raise Exception('doAssignment: Task cannot be scheduled, because of %d conflicting claims: %s' %
(len(conflictingClaims), conflictingClaims))
logger.info('doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,))
self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated')
except Exception as e:
logger.error(str(e))
self._sendStateChange(task, 'conflict')
return
# remove any output and/or intermediate data for restarting pipelines
if task['type'] == 'pipeline':
try:
du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True)
if du_result['found'] and du_result.get('disk_usage', 0) > 0:
logger.info("removing data on disk from previous run for otdb_id %s", otdb_id)
result = self.curpc.removeTaskData(task['otdb_id'])
if not result['deleted']:
logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message'])
except Exception as e:
logger.error(str(e)) # in line with failure as warning just above: allow going to scheduled state here too
# send notification that the task was scheduled
self._sendStateChange(task, 'scheduled')
def tieClaimsToTask(self, claims, task):
""" Complete a set of claims by annotating them with task-related information. """
# add start/endtime to claims
for claim in claims:
claim['starttime'] = task['starttime']
claim['endtime'] = task['endtime']
if claim['resource_type_id'] == self.resource_types['storage']:
# FIXME: find proper way to extend storage time with a year
# 2016-09-27 scisup would like to be involved in chosing these kind of defaults
# and what to do after the claim expires
# we now choose a default period of a year, and do nothing if the claim expires
claim['endtime'] += timedelta(days=365)
def _sendStateChange(self, task, status):
if status == 'scheduled' or status == 'conflict' or status == 'error':
content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']}
subject= 'Task' + status[0].upper() + status[1:]
else: # should not end up here (bug)
logger.error('_sendStateChange(): bug: Not sending notification as status is %s' % status)
return
try:
if status != 'scheduled':
# another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb
self.radbrpc.updateTask(task['id'], task_status=status)
msg = EventMessage(context=self.ra_notification_prefix + subject, content=content)
logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' ')))
self.ra_notification_bus.send(msg)
except Exception as e:
logger.error(str(e))
def processPredecessors(self, task):
mom_id = task['mom_id']
predecessor_ids = self.momrpc.getPredecessorIds(mom_id)
if str(mom_id) not in predecessor_ids or not predecessor_ids[str(mom_id)]:
logger.info('no predecessors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id)
return
predecessor_mom_ids = predecessor_ids[str(mom_id)]
logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id'])
for predecessor_mom_id in predecessor_mom_ids:
# check if the predecessor needs to be linked to this task
predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id)
if predecessor_task:
if predecessor_task['id'] not in task['predecessor_ids']:
logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s',
predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], task['otdb_id'])
self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id'])
else:
# Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond approved,
# which is in principle valid. The link in the radb will be made later via processSuccessors() below.
# Alternatively, a predecessor could have been deleted.
logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', predecessor_mom_id, task['otdb_id'])
def processSuccessors(self, task):
mom_id = task['mom_id']
successor_ids = self.momrpc.getSuccessorIds(mom_id)
if str(mom_id) not in successor_ids or not successor_ids[str(mom_id)]:
logger.info('no successors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id)
return
successor_mom_ids = successor_ids[str(mom_id)]
logger.info('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id'])
for successor_mom_id in successor_mom_ids:
# check if the successor needs to be linked to this task
successor_task = self.radbrpc.getTask(mom_id=successor_mom_id)
if successor_task:
if successor_task['id'] not in task['successor_ids']:
logger.info('connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s',
successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], task['otdb_id'])
self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id'])
movePipelineAfterItsPredecessors(successor_task, self.radbrpc)
else:
# Occurs when settings a obs or task to prescheduled while a successor has e.g. not yet been beyond approved,
# which is quite normal. The link in the radb will be made later via processPredecessors() above.
# Alternatively, a successor could have been deleted.
logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', successor_mom_id, task['otdb_id'])
def getMaxPredecessorEndTime(self, specification_tree):
predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']]
predecessor_endTimes = [parseDatetime(spec.getString('Observation.stopTime')) for spec in predecessor_specs]
if predecessor_endTimes:
return max(predecessor_endTimes)
return None
def getClusterNames(self, parset):
""" Return set of storage cluster names for all enabled output data product types in parset,
or raise for an enabled output data product type without storage cluster name.
"""
clusterNames = set()
keys = ['Output_Correlated',
'Output_IncoherentStokes',
'Output_CoherentStokes',
'Output_InstrumentModel',
'Output_SkyImage',
'Output_Pulsar']
for key in keys:
if parset.getBool('Observation.DataProducts.%s.enabled' % key, False):
name = parset.getString('Observation.DataProducts.%s.storageClusterName' % key) # may raise; don't pass default arg
clusterNames.add(name)
return clusterNames
def applyMaxFillRatios(self, db_resource_list,
db_resource_max_fill_ratios):
''' Applies db_resource_max_fill_ratios to db_resource_list.
db_resource_max_fill_ratios is e.g. [{'name': max_fill_ratio_CEP4_storage, 'value': 0.85}, ...]
'''
prefix = 'max_fill_ratio_' # + resource_group_name + '_' + resource_type_name
for ratio_dict in db_resource_max_fill_ratios:
for res_type_name, res_type_id in self.resource_types.iteritems():
if not ratio_dict['name'].endswith('_' + res_type_name):
continue
res_group_name = ratio_dict['name'][len(prefix) : -len(res_type_name)-1]
res_group_id = self.getResourceGroupIdByName(res_group_name)
res_group = self.resource_group_relations[res_group_id]
for res_id in res_group['resource_ids']:
res = db_resource_list[res_id]
if res['type_id'] != res_type_id:
continue
try:
ratio = float(ratio_dict['value'])
if ratio < 0.0 or ratio > 1.0:
raise ValueError('value not a float in range [0.0, 1.0]')
except ValueError as err:
logger.error('applyMaxFillRatios: %s = %s: %s', ratio_dict['name'], ratio_dict['value'], str(err))
break
res['available_capacity'] = min(res['available_capacity'], int(ratio * res['total_capacity']))
logger.info('applyMaxFillRatios: applied %s = %f', ratio_dict['name'], ratio)
def findClaims(self, needed_resources_list, db_resource_list):
""" Compute claims that satisfy needed_resources_list within db_resource_list.
:param needed_resources_list: a list of resources to be claimed
:param db_resource_list: all resources in RADB with availability information
:returns all_fit: a boolean indicating whether all requested resources could be claimed
:returns claims: all claims that succeeded (partial if all_fit is False)
:returns unclaimable_resources: all resources from needed_resources_list that could not be claimed ([] if all_fit is True)
"""
# This function selects resources for a task (i.e. obs or pipeline). Keep it side-effect free!
# Criteria:
# * It all fits within max fill ratio per resource group (or consider failed)
# * Avoid resources marked as unavailable
# * At most one claim per resource; i.e. merge where possible (DB friendly)
# * Most pipelines reduce data size. Observation output is relatively large,
# so spread across as many storage areas as possible (if storage is needed).
# * Only completely fill a (e.g. storage) resource when inevitable, as this also makes other
# resources (e.g. write bandwidth) unavailable for future tasks until clean up.
# * All 4 data products of complex voltage obs data (XXYY) go to a single (storage) resource.
#
# Parts of these criteria may be solved by the caller, e.g. by passing filtered arguments.
#
# Note: certain (obs) settings are more flexible than ever used (e.g. sb list per SAP for CS/IS).
# We must support the full gamut of settings, but for scenarios that are complex *and* constrained,
# producing 'conflict' may be ok, if this saves implementation or excessive computational complexity.
# Users will then have to free more resources than strictly necessary before retrying.
logger.debug('findClaims: db_resource_list: %s', db_resource_list) # big!
dtypes = {'uv', 'cs', 'is', 'im', 'img', 'pulp'}
summablePropTypeIds = {self.resource_claim_property_types['nr_of_' + dt + '_files'] for dt in dtypes}
claims = []
# needed_resources that we can't find a fit for
unclaimable_resources = []
for needed_resources in needed_resources_list:
# Replace resource names by type ids: easy matching w/ other data structs
needed_resources_by_type_id = {self.resource_types[name]: needed_resources['resource_types'][name] \
for name in needed_resources['resource_types']} # e.g. {3: 16536, 5: 170016}
logger.info('findClaims: needed_resources_by_type_id: %s', needed_resources_by_type_id)
# Find group id ('gid') of needed_resources['root_resource_group'],
# then get list of claimable resources at root_gid and its children
root_gid = self.getResourceGroupIdByName(needed_resources['root_resource_group'])
claimable_resources_list = self.getSubtreeResourcesList(root_gid, needed_resources_by_type_id,
db_resource_list) # e.g. [{3: <resource_dict>, 5: <resource_dict>}, ...]
logger.info('findClaims: considering %d claimable resource dict(s)', len(claimable_resources_list))
logger.debug('findClaims: claimable_resources_list: %s', claimable_resources_list)
# Obtain resource properties (if any)
properties = self.getResourcesProperties(needed_resources)
# Collapse needed resources if only 1 claimable resource dict, e.g. global filesystem
if len(claimable_resources_list) == 1:
logger.info('findClaims: collapsing needed_resources')
for type_id in needed_resources_by_type_id:
needed_resources_by_type_id[type_id] *= needed_resources['resource_count']
for prop in properties:
if prop['type'] in summablePropTypeIds:
prop['value'] *= needed_resources['resource_count']
needed_resources['resource_count'] = 1
# Collect all the claims for this resource
more_claims = self.fitMultipleResources(needed_resources_by_type_id, needed_resources['resource_count'], claimable_resources_list)
if more_claims is None:
# could not find a fit for this resource
logger.warn('findClaims: Failed to find enough claimable resources for %s (by_type_id: %s)',
needed_resources, needed_resources_by_type_id)
unclaimable_resources.append(needed_resources)
continue
# add resource properties to storage claims
for claim in more_claims:
if claim['resource_type_id'] == self.resource_types['storage']:
claim['properties'] = properties
# add to the list of claims
claims.extend(more_claims)
self.mergeClaims(summablePropTypeIds, claims)
all_fit = len(unclaimable_resources) == 0
return (all_fit, claims, unclaimable_resources)
def fitMultipleResources(self, needed_resources_by_type_id, resource_count, claimable_resources_list):
""" Find a fit for multiple needed resources. Modifies claimable_resources_list with a lower resource
availability with respect to the claims made (also if no claims are made!). """
claims = []
for _ in xrange(resource_count):
# try to fit a single resource set
more_claims = self.fitSingleResources(needed_resources_by_type_id, claimable_resources_list)
if more_claims is None:
# Could not find a fit
return None
logger.debug('fitMultipleResources: added claim: %s', more_claims)
# add it to our list
claims.extend(more_claims)
logger.debug('fitMultipleResources: created claim: %s', claims)
return claims
def fitSingleResources(self, needed_resources_by_type_id, claimable_resources_list):
""" Find a fit for a single needed resource set. Reorders claimable_resources_list,
and reduces the resource availability in claimable_resources_list with the size
of the resulting claims. """
if self.resource_types['storage'] in needed_resources_by_type_id:
sort_res_type = self.resource_types['storage']
else:
sort_res_type = needed_resources_by_type_id.keys()[0] # some other if not storage
# Try to fit first where there is the most space
# Sorting on every change may be slow, but for 100s of DPs, insertion of merged claims is still 3-5x slower.
# A heapq was not faster, yet solving the lack of total ordering more elaborate.
# Of course, big-O complexity here is terrible, but we are nowhere near (too) big.
claimable_resources_list.sort(key=lambda res: res[sort_res_type]['available_capacity'], reverse=True)
# Almost always iterates once. Still needed to match >1 resource types.
for claimable_resources_dict in claimable_resources_list:
claims = self.tryMakeClaim(needed_resources_by_type_id, claimable_resources_dict)
if claims is not None:
logger.debug('fitSingleResources: created claim: %s', claims)
# reduce the resource availability with the size of these claims
self.reduceResourceAvailability(claimable_resources_dict, claims)
return claims
# Could not find a fit
return None
def tryMakeClaim(self, needed_resources_by_type_id, claimable_resources_dict):
""" See if a claim fits, and return it if so, None if not. """
# Ignore check on claimable capacity of RCUs
is_claimable = self.is_claimable_capacity_wise(needed_resources_by_type_id,
claimable_resources_dict,
ignore_type_ids=[self.resource_types['rcu']])
if is_claimable:
return self.makeClaim(needed_resources_by_type_id, claimable_resources_dict)
# Claim does not fit
return None
def getResourceGroupIdByName(self, name):
""" Returns group id of resource group named name, or raises a ValueError if name was not found.
The search happens breadth-first.
"""
gids = [0] # root group id 0
i = 0
while i < len(gids): # careful iterating while modifying
res_group = self.resource_group_relations[gids[i]]
if res_group['resource_group_name'] == name:
return gids[i]
gids.extend(res_group['child_ids'])
i += 1
raise ValueError('getResourceGroupIdByName: cannot find resources to claim: unknown root_resource_group \'%s\'' % name)
def getSubtreeResourcesList(self, root_gid, needed_resources_by_type_id, db_resource_list):
""" Returns list of available resources of type id in needed_resources_by_type_id.keys()
starting at group id root_gid in the format [{type_id: {<resource_dict>}, ...}, ...].
"""
# Search breadth-first starting at root_gid.
gids = [root_gid]
resources_list = []
i = 0
while i < len(gids): # careful iterating while modifying
resources = {}
type_ids_seen = set()
res_group = self.resource_group_relations[gids[i]]
for rid in res_group['resource_ids']:
type_id = db_resource_list[rid]['type_id']
if type_id in needed_resources_by_type_id and db_resource_list[rid]['active'] and \
db_resource_list[rid]['available_capacity'] > 0:
resources[type_id] = db_resource_list[rid]
type_ids_seen.add(type_id)
# Only add resource IDs if all needed types are present in this resource group
if type_ids_seen == set(needed_resources_by_type_id):
resources_list.append(resources)
gids.extend(res_group['child_ids'])
i += 1
return resources_list
def is_claimable_capacity_wise(self, needed_resources, claimable_resources, ignore_type_ids=None):
""" Returns whether all needed_resources can be claimed from claimable_resources.
:param needed_resources: {resource_type_id: size, ...}
:param claimable_resources: {resource_type_id: {<resource_dict>}, ...}
:param ignore_type_ids: IDs of types that should not be considered
"""
types_to_ignore = ignore_type_ids if ignore_type_ids is not None else []
is_claimable = all(claim_size <= claimable_resources[res_type]['available_capacity']
for res_type, claim_size in needed_resources.items() if res_type not in types_to_ignore)
return is_claimable
def makeClaim(self, needed_resources, claimable_resources):
""" Returns list of claims for a data product (one for each needed resource type).
Format needed_resources: {resource_type_id: size, ...}
Format claimable_resources: {resource_type_id: {<resource_dict>}, ...}
For a complete claim object to send to RADB, the following fields need to be set for
each claim returned by makeClaim:
starttime, endtime, properties (optional)
"""
claims = []
for res_type, claim_size in needed_resources.items():
claim = {'starttime': None, 'endtime': None, 'status': 'claimed'}
claim['resource_id'] = claimable_resources[res_type]['id']
claim['resource_type_id'] = res_type # used internally, not propagated to radb
# RCU claim size as returned by the ResourceEstimator is actually a bit pattern (encoding which of a
# station's RCUs are requested to take part in a measurement and which not). In order to have it countable
# (as is expected of a claim size) it needs to be replaced with the number of RCUs requested. Subsequently,
# the bit pattern information is stored with the claim separately
if res_type == self.resource_types['rcu']:
used_rcus = needed_resources[self.resource_types['rcu']]
claim_size = used_rcus.count('1')
claim['used_rcus'] = used_rcus
else:
claim['used_rcus'] = None
claim['claim_size'] = claim_size
claims.append(claim)
return claims
def reduceResourceAvailability(self, claimable_resources_dict, claims):
""" Reduce the resource_availability for the resources in claimable_resources_dict
with the sizes of the given claims. """
for claim in claims:
res_type = claim['resource_type_id']
claim_size = claim['claim_size']
claimable_resources_dict[res_type]['available_capacity'] -= claim_size
def getFilesProperties(self, files_dict, io_type):
""" Return list of properties in claim format converted from files_dict.
E.g. files_dict: {'cs': [ {'sap_nr': 2, ..., 'properties': {'nr_of_uv_files': 123, ...}}, {...} ], 'is': ...}
"""
if files_dict is None:
return []
logger.info('getFilesProperties: processing %s_files: %s', io_type, files_dict)
properties = []
for dptype in files_dict:
for dptype_dict in files_dict[dptype]:
sap_nr = dptype_dict.get('sap_nr') # only with obs output and obs successor input
for prop_type_name, prop_value in dptype_dict['properties'].items():
rc_property_type_id = self.resource_claim_property_types.get(prop_type_name)
if rc_property_type_id is None:
logger.error('getFilesProperties: ignoring unknown prop type: %s', prop_type_name)
continue
prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type}
if sap_nr is not None:
prop['sap_nr'] = sap_nr
properties.append(prop)
return properties
def getResourcesProperties(self, needed_resources):
""" Return all the file properties of a given set of resources (if any). """
input_files = needed_resources.get('input_files')
output_files = needed_resources.get('output_files')
properties = self.getFilesProperties(input_files, 'input')
properties.extend(self.getFilesProperties(output_files, 'output'))
return properties
# TODO: (Ruud B) check and decide if we need to be able to merge claims based on claim[used_rcus]
def mergeClaims(self, summablePropTypeIds, claims):
""" Merge claims allocated onto the same resources.
To merge claim properties, summablePropTypeIds is used.
"""
logger.info('mergeClaims: merging claims for the same resource across %d claims', len(claims))
claims.sort( key=lambda claim: (claim['resource_id'], claim.get('properties')) )
i = 1
while i < len(claims): # careful iterating while modifying
if claims[i-1]['resource_id'] == claims[i]['resource_id']:
# Merge claim_size and props; starttime and endtime are always equal for the same resource_id
claims[i-1]['claim_size'] += claims[i]['claim_size']
if 'properties' in claims[i] and len(claims[i]['properties']):
if 'properties' not in claims[i-1]:
claims[i-1]['properties'] = []
else: # shallow copy to avoid aliasing; mergeResourceProperties() does the rest
claims[i-1]['properties'] = list(claims[i-1]['properties'])
self.mergeResourceProperties(summablePropTypeIds,
claims[i-1]['properties'], claims[i]['properties'])
claims.pop(i) # can be more efficient O()-wise
else:
i += 1
def mergeResourceProperties(self, summablePropTypeIds, props0, props1):
""" Ensure props0 contains all properties of props1.
A property of type in summablePropTypeIds must have its value added.
NOTE: caller has to ensure that 'not (props0 is props1)' holds.
"""
# Better datastructs could easily avoid this O(N^2). len(props) is ~5
props0len = len(props0)
for p1 in props1:
found = False
i = 0
while i < props0len: # careful iterating while modifying
p0 = props0[i]
if p0['type'] == p1['type'] and \
p0.get('sap_nr') == p1.get('sap_nr') and \
p0['io_type'] == p1['io_type']:
if p0['type'] in summablePropTypeIds: # same and need to sum values
props0[i] = dict(props0[i]) # copy to avoid changing p1 too if refers to same obj
props0[i]['value'] += p1['value']
elif p0['value'] != p1['value']:
logger.warn('mergeResourceProperties: unexpected same prop pair, but with different values: %s and %s', p0, p1)
found = True
break
i += 1
if not found:
props0.append(p1)