Skip to content
Snippets Groups Projects
Commit 6dded040 authored by Ruud Beukema's avatar Ruud Beukema
Browse files

Task #10780: Started refactoring doAssignment() up until the call to the...

Task #10780: Started refactoring doAssignment() up until the call to the findclaims() function. Refactored unit tests accordingly.
parent 30a75e11
No related branches found
No related tags found
No related merge requests found
......@@ -98,6 +98,8 @@ class ResourceAssigner():
self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
self.ra_notification_prefix = ra_notification_prefix
self.task = None
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
self.open()
......@@ -165,158 +167,59 @@ class ResourceAssigner():
dtypes = {'uv', 'cs', 'is', 'im', 'img', 'pulp'}
return {self.resource_claim_property_types['nr_of_' + dt + '_files'] for dt in dtypes}
def doAssignment(self, specification_tree):
logger.info('doAssignment: specification_tree=%s' % (specification_tree))
def doAssignment(self, otdb_id, specification_tree):
""" Attempts to assign the specified resources
otdb_id = specification_tree['otdb_id']
:param otdb_id: OTDB ID of the main task which resources need to be assigned
:param specification_tree: the specification tree containing the main task and its resources
"""
assignable_task_states = ['approved', 'prescheduled']
status = specification_tree.get('state', '').lower()
if status not in assignable_task_states:
assignable_task_states_str = ', '.join(assignable_task_states)
logger.warn('skipping specification for task otdb_id=%s, because status=%s (not one of %s)', otdb_id,
status, assignable_task_states_str)
return
logger.info('doAssignment: specification_tree=%s' % specification_tree)
try:
mainParset = parameterset(specification_tree['specification'])
except Exception as e:
logger.error(str(e))
return
self._do_assignment(otdb_id, specification_tree)
taskType = specification_tree['task_type'] # is required item
if 'task_subtype' in specification_tree: # is optional item
taskSubtype = specification_tree['task_subtype']
else:
taskSubtype = ''
if self.taskTypeProducesOutput( taskType , taskSubtype):
# Only assign resources for task output to known clusters
try:
clusterNameSet = self.getClusterNames(mainParset)
if str() in clusterNameSet or len(clusterNameSet) != 1:
# Empty set or name is always an error.
# TODO: To support >1 cluster per obs,
# self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name
# Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster
logger.error('clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet)
return
clusterName = clusterNameSet.pop()
# Retrieve known cluster names (not all may be a valid storage target, but we cannot know...)
knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')}
logger.info('known clusters: %s', knownClusterSet)
if clusterName not in knownClusterSet:
logger.error('skipping resource assignment for task with cluster name \'%s\' not in known clusters %s' % (clusterName, knownClusterSet))
return
except Exception as e:
logger.error(str(e))
return
try:
# fix for MoM bug introduced before NV's holiday
# MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4
# so, override it here if needed, and update to otdb
processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '')
if processingClusterName != clusterName:
logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s',
processingClusterName, clusterName, otdb_id)
self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName })
if self.task is not None:
# send notification that the task was scheduled
self._announceStateChange(self.task, 'scheduled')
except Exception as e:
logger.error(str(e))
return
else:
# task has no output
clusterName = ''
def applySaneStartEndTime(origStartTime, origEndTime):
startTime = datetime.utcnow() + timedelta(minutes=3)
if self.task is not None:
self._announceStateChange(self.task, 'error')
maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree)
if maxPredecessorEndTime and maxPredecessorEndTime > startTime:
startTime = maxPredecessorEndTime + timedelta(minutes=3)
def _do_assignment(self, otdb_id, specification_tree):
""" Performs the actual attempt to assign resources of the desired task """
taskDuration = timedelta(seconds=totalSeconds(origEndTime - origStartTime)) if origEndTime > origStartTime else timedelta(hours=1)
endTime = startTime + taskDuration
logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
startTime, endTime, otdb_id)
logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id)
self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'),
'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')})
return startTime, endTime
# TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. Fix the cause! Idem for MoM fix up below.
try:
startTime = parseDatetime(mainParset.getString('Observation.startTime'))
endTime = parseDatetime(mainParset.getString('Observation.stopTime'))
if startTime < datetime.utcnow():
startTime, endTime = applySaneStartEndTime(startTime, endTime)
except Exception as e:
logger.error(str(e))
return
self.task = None
if self._get_is_assignable(specification_tree):
main_parset = self._get_main_parset(specification_tree)
task_type, task_subtype = self._get_task_type(specification_tree)
cluster_name = self._get_clustername(otdb_id, main_parset, task_type, task_subtype)
start_time, end_time = self._get_main_task_start_and_end_times(specification_tree)
logger.info('doAssignment: Accepted specification')
# insert new task and specification in the radb
# any existing specification and task with same otdb_id will be deleted automatically
momId = mainParset.getInt('Observation.momID', -1)
task_id, self.task = self._insert_main_task(specification_tree, start_time, end_time, cluster_name)
logger.info('doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s cluster=%s' %
(momId, otdb_id, status, taskType, startTime, endTime, clusterName))
try:
result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType,
startTime, endTime, str(mainParset), clusterName)
except Exception as e:
logger.error(str(e))
result = {'inserted': False} # handled next
if not result['inserted']:
logger.error('could not insert specification and task: result = %s', result)
return
logger.info('doAssignment: task=%s', self.task)
specificationId = result['specification_id']
taskId = result['task_id']
logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId, taskId))
self.processPredecessors(self.task)
self.processSuccessors(self.task)
task = None
try:
task = self.radbrpc.getTask(taskId)
except Exception as e:
logger.error(str(e))
return
logger.info('doAssignment: task=%s', task)
try:
self.processPredecessors(task)
self.processSuccessors(task)
except Exception as e:
logger.error(str(e))
self._announceStateChange(task, 'error')
return
if status != 'prescheduled': # should only happen for approved
# Do this check after insertion of specif, task and predecessor/successor relations,
status = specification_tree.get('state', '').lower()
if status == 'approved':
# Do this check after insertion of specification, task and predecessor/successor relations,
# so approved tasks appear correctly in the web scheduler.
logger.info('skipping resource assignment for task otdb_id=%s, because status=%s' % (otdb_id, status))
return
try:
estimates = self.getResourceEstimates(specification_tree, otdb_id, taskType, taskId)
else:
estimates = self.getResourceEstimates(specification_tree, otdb_id, task_type, task_id)
db_resource_list = self.radbrpc.getResources(include_availability=True)
db_resource_max_fill_ratios = self.radbrpc.getResourceAllocationConfig(sql_like_name_pattern='max_fill_ratio_%')
except Exception as e:
logger.error(str(e))
self._announceStateChange(task, 'error')
return
self.applyMaxFillRatios(db_resource_list, db_resource_max_fill_ratios)
else:
return
# Assume estimates are (close enough to) accurate to determine resource claims for this task.
# Try to get a set of non-conflicting claims from availability info in the RA DB.
......@@ -326,50 +229,206 @@ class ResourceAssigner():
# If any claim is in conflict state, then the task is put to conflict status as well.
all_fit, claims, _ = self.findClaims(estimates, db_resource_list)
if not all_fit:
self._announceStateChange(task, 'error')
self._announceStateChange(self.task, 'error')
return
if claims:
# Complete the claims by annotating them with task information
self.tieClaimsToTask(claims, task)
self.tieClaimsToTask(claims, self.task)
logger.info('doAssignment: inserting %d claims in the radb: %s', len(claims), claims)
try:
claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids']
claim_ids = self.radbrpc.insertResourceClaims(self.task['id'], claims, 1, 'anonymous', -1)['ids']
logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids))
if len(claim_ids) != len(claims):
logger.error('doAssignment: too few claims were inserted in the radb')
self._announceStateChange(task, 'conflict')
self._announceStateChange(self.task, 'conflict')
return
conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict')
conflictingClaims = self.radbrpc.getResourceClaims(task_ids=task_id, status='conflict')
if conflictingClaims:
# Will do _announceStateChange(task, 'conflict'), and radb sets task status to conflict automatically
# Will do _announceStateChange(self.task, 'conflict'), and radb sets task status to conflict automatically
logger.error('doAssignment: Task cannot be scheduled, because of %d conflicting claims: %s' %
(len(conflictingClaims), conflictingClaims))
self._announceStateChange(task, 'conflict')
self._announceStateChange(self.task, 'conflict')
return
logger.info('doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to claimed' % (taskId,))
self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='claimed')
logger.info('doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to claimed' % (task_id,))
self.radbrpc.updateTaskAndResourceClaims(task_id, claim_status='claimed')
except Exception as e:
logger.error(str(e))
self._announceStateChange(task, 'conflict')
self._announceStateChange(self.task, 'conflict')
return
# remove any output and/or intermediate data for restarting pipelines
if task['type'] == 'pipeline':
if self.task['type'] == 'pipeline':
try:
du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True)
du_result = self.sqrpc.getDiskUsageForOTDBId(self.task['otdb_id'], include_scratch_paths=True, force_update=True)
if du_result['found'] and du_result.get('disk_usage', 0) > 0:
logger.info("removing data on disk from previous run for otdb_id %s", otdb_id)
result = self.curpc.removeTaskData(task['otdb_id'])
result = self.curpc.removeTaskData(self.task['otdb_id'])
if not result['deleted']:
logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message'])
except Exception as e:
logger.error(str(e)) # in line with failure as warning just above: allow going to scheduled state here too
# send notification that the task was scheduled
self._announceStateChange(task, 'scheduled')
def _get_is_assignable(self, specification_tree):
""" Verifies if a task can actually be assigned
:param specification_tree: the specification tree of the task
:returns True if the task can be assigned or False if not
"""
is_assignable = True
otdb_id = specification_tree['otdb_id']
assignable_task_states = ['approved', 'prescheduled']
status = specification_tree.get('state', '').lower()
if not status in assignable_task_states:
is_assignable = False
assignable_task_states_str = ', '.join(assignable_task_states)
logger.warn('skipping specification for task otdb_id=%s, because status=%s (not one of %s)', otdb_id,
status, assignable_task_states_str)
return is_assignable
def _get_main_parset(self, specification_tree):
""" Extracts the main parset from a specification tree
:param specification_tree the specification tree of the task
:returns the main parset
"""
return parameterset(specification_tree['specification'])
def _get_task_type(self, specification_tree):
taskType = specification_tree['task_type'] # is required item
if 'task_subtype' in specification_tree: # is optional item
taskSubtype = specification_tree['task_subtype']
else:
taskSubtype = ''
return taskType, taskSubtype
def _get_clustername(self, otdb_id, mainParset, taskType, taskSubtype):
""" Determines the name of the cluster to which to store the task's output - if it produces output at all.
:param otdb_id: the ORDB ID of the (main) task
:param mainParset: the parset of the main task
:param taskType: the task's type
:param taskSubtype: the task's subtype
:returns The name of the output cluster, or an empty string if none is applicable
"""
clusterName = ''
if self.taskTypeProducesOutput(taskType, taskSubtype):
# Only assign resources for task output to known clusters
clusterNameSet = self.getClusterNames(mainParset)
if str() in clusterNameSet or len(clusterNameSet) != 1:
# Empty set or name is always an error.
# TODO: To support >1 cluster per obs,
# self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name
# Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster
logger.error(
'clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet)
else:
clusterName = clusterNameSet.pop()
# Retrieve known cluster names (not all may be a valid storage target, but we cannot know...)
knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')}
logger.info('known clusters: %s', knownClusterSet)
if clusterName not in knownClusterSet:
raise Exception("skipping resource assignment for task with cluster name '" + clusterName +
"' not in known clusters " + str(knownClusterSet))
else:
# fix for MoM bug introduced before NV's holiday
# MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4
# so, override it here if needed, and update to otdb
processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '')
if processingClusterName != clusterName:
logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s',
processingClusterName, clusterName, otdb_id)
self.otdbrpc.taskSetSpecification(otdb_id, {
'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName})
return clusterName
def _get_main_task_start_and_end_times(self, specification_tree):
""" Get the start time and end time of the main task adapted such that (a) there's a period of 3 minutes between
tasks and (b) the start time and end time are actually in the future.
:param specification_tree: specification tree for the main task
:returns 2-tuple (start_time, end_time)
"""
def apply_sane_start_and_end_time(orig_start_time, orig_end_time, otdb_id):
start_time = datetime.utcnow() + timedelta(minutes=3)
max_predecessor_end_time = self.getMaxPredecessorEndTime(specification_tree)
if max_predecessor_end_time and max_predecessor_end_time > start_time:
start_time = max_predecessor_end_time + timedelta(minutes=3)
if orig_end_time > orig_start_time:
task_duration = timedelta(seconds=totalSeconds(orig_end_time - orig_start_time))
else:
timedelta(hours=1)
end_time = start_time + task_duration
logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
start_time, end_time, otdb_id)
logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', start_time,
end_time, otdb_id)
self.otdbrpc.taskSetSpecification(otdb_id,
{'LOFAR.ObsSW.Observation.startTime': start_time.strftime('%Y-%m-%d %H:%M:%S'),
'LOFAR.ObsSW.Observation.stopTime': end_time.strftime('%Y-%m-%d %H:%M:%S')})
return start_time, end_time
# TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part.
# TODO: Fix the cause! Idem for MoM fix up below.
main_parset = self._get_main_parset(specification_tree)
start_time = parseDatetime(main_parset.getString('Observation.startTime'))
end_time = parseDatetime(main_parset.getString('Observation.stopTime'))
if start_time < datetime.utcnow():
otdb_id = specification_tree['otdb_id']
start_time, end_time = apply_sane_start_and_end_time(start_time, end_time, otdb_id)
return start_time, end_time
def _insert_main_task(self, specification_tree, start_time, end_time, cluster_name):
""" Inserts the main task and its specification into the RADB. Any existing specification and task with same
otdb_id will be deleted automatically.
:param specification_tree: specification tree for the main task
:return: 2-tuple (task_id, task)
"""
task_type, _ = self._get_task_type(specification_tree)
main_parset = self._get_main_parset(specification_tree)
mom_id = main_parset.getInt('Observation.momID', -1)
status = specification_tree.get('state', '').lower()
otdb_id = specification_tree['otdb_id']
logger.info(
'doAssignment: insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s cluster=%s' %
(mom_id, otdb_id, status, task_type, start_time, end_time, cluster_name))
result = self.radbrpc.insertSpecificationAndTask(mom_id, otdb_id, status, task_type, start_time, end_time,
str(main_parset), cluster_name)
specification_id = result['specification_id']
task_id = result['task_id']
logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id))
task = self.radbrpc.getTask(task_id)
return task_id, task
def getResourceEstimates(self, specification_tree, otdb_id, taskType, taskId):
""" Request and return checked estimates of needed resources from Resource Estimator. """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment