-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
assignment.py 24.22 KiB
#!/usr/bin/env python
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $
"""
ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset.
"""
import logging
from datetime import datetime, timedelta
import time
import collections
from lofar.common.util import humanreadablesize
from lofar.messaging.messages import EventMessage
from lofar.messaging.messagebus import ToBus
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset
from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME
logger = logging.getLogger(__name__)
class ResourceAssigner():
def __init__(self,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
re_busname=RE_BUSNAME,
re_servicename=RE_SERVICENAME,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME,
cleanup_busname=DEFAULT_CLEANUP_BUSNAME,
cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME,
ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME,
ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None):
"""
ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset.
:param radb_busname: busname on which the radb service listens (default: lofar.ra.command)
:param radb_servicename: servicename of the radb service (default: RADBService)
:param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command)
:param re_servicename: servicename of the resource estimator service (default: ResourceEstimation)
:param broker: Valid Qpid broker host (default: None, which means localhost)
"""
self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180)
self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180)
self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker)
self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
self.ra_notification_prefix = ra_notification_prefix
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Internal use only. (handles scope 'with')"""
self.close()
def open(self):
"""Open rpc connections to radb service and resource estimator service"""
self.radbrpc.open()
self.rerpc.open()
self.otdbrpc.open()
self.momrpc.open()
self.curpc.open()
self.ra_notification_bus.open()
def close(self):
"""Close rpc connections to radb service and resource estimator service"""
self.radbrpc.close()
self.rerpc.close()
self.otdbrpc.close()
self.momrpc.close()
self.curpc.close()
self.ra_notification_bus.close()
def doAssignment(self, specification_tree):
logger.info('doAssignment: specification_tree=%s' % (specification_tree))
otdb_id = specification_tree['otdb_id']
taskType = specification_tree.get('task_type', '').lower()
status = specification_tree.get('state', '').lower()
if status not in ['approved', 'prescheduled']: # cep2 accepts both, cep4 only prescheduled, see below
logger.info('skipping specification for otdb_id=%s because status=%s', (otdb_id, status))
#parse main parset...
mainParset = parameterset(specification_tree['specification'])
momId = mainParset.getInt('Observation.momID', -1)
clusterIsCEP4 = self.checkClusterIsCEP4(mainParset)
clusterName = 'CEP4' if clusterIsCEP4 else 'CEP2'
if clusterIsCEP4:
try:
startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S')
endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S')
except ValueError:
logger.warning('cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', (otdb_id, ))
maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree)
taskDuration = mainParset.getInt('Observation.Scheduler.taskDuration', -1)
taskDuration = timedelta(seconds=taskDuration) if taskDuration > 0 else timedelta(hours=1)
if maxPredecessorEndTime:
startTime = maxPredecessorEndTime + timedelta(minutes=1)
endTime = startTime + taskDuration
logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s based on maxPredecessorEndTime (%s)',
startTime, endTime, otdb_id, maxPredecessorEndTime)
else:
startTime = datetime.utcnow() + timedelta(hours=1) + timedelta(minutes=1)
endTime = startTime + taskDuration
logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s one hour from now',
startTime, endTime, otdb_id)
try:
logger.info('uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id)
self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'),
'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')})
except Exception as e:
logger.error(e)
try:
# fix for MoM bug introduced before NV's holiday
# MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4
# so, override it here if needed, and update to otdb
processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '')
if processingClusterName != clusterName:
logger.info('overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s',
processingClusterName, clusterName, otdb_id)
self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName })
except Exception as e:
logger.error(e)
# insert new task and specification in the radb
# any existing specification and task with same otdb_id will be deleted automatically
logger.info('doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s cluster=%s' %
(momId, otdb_id, status, taskType, startTime, endTime, clusterName))
result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType, startTime, endTime, str(mainParset), clusterName)
if not result['inserted']:
logger.error('could not insert specification and task')
return
specificationId = result['specification_id']
taskId = result['task_id']
logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId,taskId))
task = self.radbrpc.getTask(taskId)
self.processPredecessors(task)
self.processSuccessors(task)
# do not assign resources to task for other clusters than cep4
if not clusterIsCEP4:
return
if status != 'prescheduled':
logger.info('skipping resource assignment for CEP4 task otdb_id=%s because status=%s' % (otdb_id, status))
return
needed = self.getNeededResouces(specification_tree)
logger.info('doAssignment: getNeededResouces=%s' % (needed,))
if not str(otdb_id) in needed:
logger.error("no otdb_id %s found in estimator results %s" % (otdb_id, needed))
return
if not taskType in needed[str(otdb_id)]:
logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)]))
return
# claim the resources for this task
# during the claim inserts the claims are automatically validated
# and if not enough resources are available, then they are put to conflict status
# also, if any claim is in conflict state, then the task is put to conflict status as well
main_needed = needed[str(otdb_id)]
if 'errors' in main_needed and main_needed['errors']:
for error in main_needed['errors']:
logger.error("Error in estimator: %s", error)
logger.error("Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to 'error'", otdb_id, taskId)
self.radbrpc.updateTask(taskId, status='error')
self._sendNotification(task, 'error')
else:
claimed, claim_ids = self.claimResources(main_needed, task)
if claimed:
conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict')
if conflictingClaims:
# radb set's task status to conflict automatically
logger.warning('doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' %
(len(conflictingClaims), conflictingClaims))
self._sendNotification(task, 'conflict')
else:
logger.info('doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,))
self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated')
# remove any output and/or intermediate data for restarting CEP4 pipelines
if task['type'] == 'pipeline':
path_result = self.curpc.getPathForOTDBId(task['otdb_id'])
if path_result['found']:
logger.info("removing data on disk from previous run for otdb_id %s", otdb_id)
result = self.curpc.removeTaskData(task['otdb_id'])
if not result['deleted']:
logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message'])
# send notification that the task was scheduled,
# another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb
self._sendNotification(task, 'scheduled')
else:
logger.warning('doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % (taskId))
self.radbrpc.updateTask(taskId, status='conflict')
self._sendNotification(task, 'conflict')
def _sendNotification(self, task, status):
try:
if status == 'scheduled' or status == 'conflict' or status == 'error':
content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']}
subject= 'Task' + status[0].upper() + status[1:]
msg = EventMessage(context=self.ra_notification_prefix + subject, content=content)
logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' ')))
self.ra_notification_bus.send(msg)
except Exception as e:
logger.error(str(e))
def processPredecessors(self, task):
try:
predecessor_mom_ids = self.momrpc.getPredecessorIds(task['mom_id'])[str(task['mom_id'])]
if predecessor_mom_ids:
logger.info('proccessing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id'])
for predecessor_mom_id in predecessor_mom_ids:
#check if the predecessor needs to be linked to this task
predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id)
if predecessor_task:
if predecessor_task['id'] not in task['predecessor_ids']:
logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to it\'s successor with mom_id=%s otdb_id=%s', predecessor_task['mom_id'],
predecessor_task['otdb_id'],
task['mom_id'],
task['otdb_id'])
self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id'])
else:
logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', predecessor_task['otdb_id'], task['otdb_id'])
else:
logger.info('no predecessors for otdb_id=%s', task['otdb_id'])
except Exception as e:
logger.error(e)
def processSuccessors(self, task):
try:
successor_mom_ids = self.momrpc.getSuccessorIds(task['mom_id'])[str(task['mom_id'])]
if successor_mom_ids:
logger.info('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id'])
for successor_mom_id in successor_mom_ids:
#check if the successor needs to be linked to this task
successor_task = self.radbrpc.getTask(mom_id=successor_mom_id)
if successor_task:
if successor_task['id'] not in task['successor_ids']:
logger.info('connecting successor task with mom_id=%s otdb_id=%s to it\'s predecessor with mom_id=%s otdb_id=%s', successor_task['mom_id'],
successor_task['otdb_id'],
task['mom_id'],
task['otdb_id'])
self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id'])
movePipelineAfterItsPredecessors(successor_task, self.radbrpc)
else:
logger.warning('could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', successor_task['otdb_id'], task['otdb_id'])
else:
logger.info('no successors for otdb_id=%s', task['otdb_id'])
except Exception as e:
logger.error(e)
def getMaxPredecessorEndTime(self, specification_tree):
try:
predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']]
predecessor_endTimes = [datetime.strptime(spec.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') for spec in predecessor_specs]
if predecessor_endTimes:
return max(predecessor_endTimes)
except Exception as e:
logger.error(e)
return None
def checkClusterIsCEP4(self, parset):
# check storageClusterName for enabled DataProducts
# if any storageClusterName is not CEP4, we do not accept this parset
keys = ['Output_Correlated',
'Output_IncoherentStokes',
'Output_CoherentStokes',
'Output_InstrumentModel',
'Output_SkyImage',
'Output_Pulsar']
for key in keys:
if parset.getBool('Observation.DataProducts.%s.enabled' % key, False):
if parset.getString('Observation.DataProducts.%s.storageClusterName' % key, '') != 'CEP4':
logger.warn("storageClusterName not CEP4, rejecting specification.")
return False
logger.info("all enabled storageClusterName's are CEP4, accepting specification.")
return True
def getNeededResouces(self, specification_tree):
replymessage, status = self.rerpc({"specification_tree":specification_tree}, timeout=10)
logger.info('getNeededResouces: %s' % replymessage)
return replymessage
def claimResources(self, needed_resources, task):
logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources))
# get the needed resources for the task type
needed_resources_for_task_type = needed_resources[task['type']]
# get db lists
rc_property_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()}
resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()}
resources = self.radbrpc.getResources()
# loop over needed_resources -> resource_type -> claim (and props)
# flatten the tree dict to a list of claims (with props)
claims = []
for resource_type_name, needed_claim_for_resource_type in needed_resources_for_task_type.items():
if resource_type_name in resource_types:
logger.info('claimResources: processing resource_type: %s contents: %s' % (resource_type_name, needed_claim_for_resource_type))
db_resource_type_id = resource_types[resource_type_name]
db_resources_for_type = [r for r in resources if r['type_id'] == db_resource_type_id]
# needed_claim_for_resource_type is a dict containing exactly one kvp of which the value is an int
# that value is the value for the claim
needed_claim_value = next((v for k,v in needed_claim_for_resource_type.items() if isinstance(v, int)))
# FIXME: right now we just pick the first resource from the 'cep4' resources.
# estimator will deliver this info in the future
db_cep4_resources_for_type = [r for r in db_resources_for_type if 'cep4' in r['name'].lower()]
if db_cep4_resources_for_type:
claim = {'resource_id':db_cep4_resources_for_type[0]['id'],
'starttime':task['starttime'],
'endtime':task['endtime'],
'status':'claimed',
'claim_size':needed_claim_value}
#FIXME: find proper way to extend storage time with a month
if 'storage' in db_cep4_resources_for_type[0]['name']:
claim['endtime'] += timedelta(days=31)
# if the needed_claim_for_resource_type dict contains more kvp's,
# then the subdicts contains groups of properties for the claim
if len(needed_claim_for_resource_type) > 1:
claim['properties'] = []
def processProperties(propertiesDict, sap_nr=None, is_input=False):
for prop_type_name, prop_value in propertiesDict.items():
if prop_type_name in rc_property_types:
rc_property_type_id = rc_property_types[prop_type_name]
property = {'type':rc_property_type_id,
'value':prop_value,
'io_type': 'input' if is_input else 'output'}
if sap_nr is not None:
property['sap_nr'] = sap_nr
claim['properties'].append(property)
else:
logger.error('claimResources: unknown prop_type:%s' % prop_type_name)
subdicts = {k:v for k,v in needed_claim_for_resource_type.items() if isinstance(v, dict)}
for subdict_name, subdict in subdicts.items():
logger.info('claimResources: processing resource_type: %s subdict_name: \'%s\' subdict_contents: %s' % (resource_type_name, subdict_name, subdict))
is_input = 'input' in subdict_name.lower()
for group_name, needed_prop_group in subdict.items():
if group_name == 'saps':
for sap_dict in needed_prop_group:
processProperties(sap_dict['properties'], sap_dict['sap_nr'], is_input)
else:
processProperties(needed_prop_group, None, is_input)
logger.info('claimResources: created claim:%s' % claim)
claims.append(claim)
else:
logger.error('claimResources: unknown resource_type:%s' % resource_type_name)
logger.info('claimResources: inserting %d claims in the radb' % len(claims))
claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids']
logger.info('claimResources: %d claims were inserted in the radb' % len(claim_ids))
return len(claim_ids) == len(claims), claim_ids