Select Git revision
oskarelementresponse.cc
Forked from
ResearchAndDevelopment / EveryBeam
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
propagator.py 10.76 KiB
#!/usr/bin/env python
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $
"""
RAtoOTDBTaskSpecificationPropagator gets a task to be scheduled in OTDB,
reads the info from the RA DB and sends it to OTDB in the correct format.
"""
import logging
import datetime
import time
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC as RADBRPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.translator import RAtoOTDBTranslator
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
logger = logging.getLogger(__name__)
class RAtoOTDBPropagator():
def __init__(self,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
radb_broker=None,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
otdb_broker=None,
mom_broker=None,
broker=None):
"""
RAtoOTDBPropagator updates tasks in the OTDB after the ResourceAssigner is done with them.
:param radb_busname: busname on which the radb service listens (default: lofar.ra.command)
:param radb_servicename: servicename of the radb service (default: RADBService)
:param radb_broker: valid Qpid broker host (default: None, which means localhost)
:param otdb_busname: busname on which the OTDB service listens (default: lofar.otdb.command)
:param otdb_servicename: servicename of the OTDB service (default: OTDBService)
:param otdb_broker: valid Qpid broker host (default: None, which means localhost)
:param broker: if specified, overrules radb_broker and otdb_broker. Valid Qpid broker host (default: None, which means localhost)
"""
if broker:
radb_broker = broker
otdb_broker = broker
mom_broker = broker
self.radbrpc = RADBRPC(busname=radb_busname, servicename=radb_servicename, broker=radb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=otdb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.momrpc = MoMQueryRPC(busname=mom_busname, servicename=mom_servicename, broker=mom_broker)
self.translator = RAtoOTDBTranslator()
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Internal use only. (handles scope 'with')"""
self.close()
def open(self):
"""Open rpc connections to radb service and resource estimator service"""
self.radbrpc.open()
self.otdbrpc.open()
self.momrpc.open()
def close(self):
"""Close rpc connections to radb service and resource estimator service"""
self.radbrpc.close()
self.otdbrpc.close()
self.momrpc.close()
def doTaskConflict(self, otdb_id):
logger.info('doTaskConflict: otdb_id=%s' % (otdb_id,))
if not otdb_id:
logger.warning('doTaskConflict no valid otdb_id: otdb_id=%s' % (otdb_id,))
return
try:
self.otdbrpc.taskSetStatus(otdb_id, 'conflict')
except Exception as e:
logger.error(e)
def doTaskError(self, otdb_id):
logger.info('doTaskError: otdb_id=%s' % (otdb_id,))
if not otdb_id:
logger.warning('doTaskError no valid otdb_id: otdb_id=%s' % (otdb_id,))
return
try:
self.otdbrpc.taskSetStatus(otdb_id, 'error')
except Exception as e:
logger.error(e)
def doTaskScheduled(self, ra_id, otdb_id, mom_id):
try:
logger.info('doTaskScheduled: ra_id=%s otdb_id=%s mom_id=%s' % (ra_id, otdb_id, mom_id))
if not otdb_id:
logger.warning('doTaskScheduled no valid otdb_id: otdb_id=%s' % (otdb_id,))
return
ra_info = self.getRAinfo(ra_id)
logger.info('RA info for ra_id=%s otdb_id=%s: %s' % (ra_id, otdb_id, ra_info))
# check if this is a CEP4 task, or an old CEP2 task
# at this moment the most simple check is to see if RA claimed (CEP4) storage
# TODO: do proper check on cluster/storage/etc
if not ra_info['storage']:
logger.info("No (CEP4) storage claimed for ra_id=%s otdb_id=%s, skipping otdb specification update." % (ra_id, otdb_id))
return
#get mom project name
try:
project = self.momrpc.getProjectDetails(mom_id)
logger.info(project)
project_name = "_".join(project[str(mom_id)]['project_name'].split())
except (RPCException, KeyError) as e:
logger.error('Could not get project name from MoM for mom_id %s: %s' % (mom_id, str(e)))
logger.info("Using 'unknown' as project name.")
project_name = 'unknown'
otdb_info = self.translator.CreateParset(otdb_id, ra_info, project_name)
logger.info("Parset info for OTDB: %s" %otdb_info)
self.setOTDBinfo(otdb_id, otdb_info, 'scheduled')
except Exception as e:
logger.error(e)
self.doTaskConflict(otdb_id)
def ParseStorageProperties(self, storage_claim):
"""input something like:
{u'username':u'anonymous', u'status': u'allocated', u'resource_name':
u'cep4storage', u'user_id': -1, u'resource_type_id': 5, u'task_id': 6349,
u'status_id': 1, u'resource_id': 117, u'session_id': 1, u'id': 339,
u'claim_size': 24000, u'starttime': datetime.datetime(2016, 6, 10, 16, 8, 15),
u'resource_type_name': u'storage', u'endtime': datetime.datetime(2016, 7, 11,
16, 33, 15), u'properties': [{u'io_type_name': u'output', u'type_name':
u'img_file_size', u'value': 1000, u'io_type_id': 0, u'type_id': 12, u'id': 808},
{u'io_type_name': u'output', u'type_name': u'nr_of_img_files', u'value': 24,
u'io_type_id': 0, u'type_id': 4, u'id': 809}, {u'io_type_name': u'input',
u'type_name': u'nr_of_uv_files', u'value': 240, u'io_type_id': 1, u'type_id': 2,
u'id': 810}, {u'io_type_name': u'input', u'type_name': u'uv_file_size',
u'value': 43957416, u'io_type_id': 1, u'type_id': 10, u'id': 811}]}
output something like:
{'output_files': {u'nr_of_im_files': 488, u'nr_of_uv_files': 488, u'im_file_size': 1000, u'uv_file_size': 32500565}}
"""
result = {'input_files': {}, 'output_files': {}}
# FIXME This is very fragile code, mainly because we don't know if 'saps' are part of the input or output.
# This should probably be redesigned, but might require changes in how RADB works.
if 'saps' in storage_claim:
input = False
output = False
saps = []
for s in storage_claim['saps']:
properties = {}
for p in s['properties']:
if p['io_type_name'] == 'output':
properties[p['type_name']] = p['value']
output = True
if p['io_type_name'] == 'input':
properties[p['type_name']] = p['value']
input = True
if input or output:
saps.append({'sap_nr' : s['sap_nr'], 'properties': properties})
if input:
if saps:
result['input_files']['saps'] = saps
if output:
if saps:
result['output_files']['saps'] = saps
if 'properties' in storage_claim:
for p in storage_claim['properties']:
if p['io_type_name'] == 'output':
result['output_files'][p['type_name']] = p['value']
if p['io_type_name'] == 'input':
result['input_files'][p['type_name']] = p['value']
logging.info(result)
return result
def getRAinfo(self, ra_id):
info = {}
info["storage"] = {}
task = self.radbrpc.getTask(ra_id)
claims = self.radbrpc.getResourceClaims(task_ids=ra_id, extended=True, include_properties=True)
for claim in claims:
logger.debug("Processing claim: %s" % claim)
if claim['resource_type_name'] == 'storage': ## TODO we will need to check for different storage names/types in the future
info['storage'] = self.ParseStorageProperties(claim)
info["starttime"] = task["starttime"]
info["endtime"] = task["endtime"]
info["status"] = task["status"]
info["type"] = task["type"]
return info
def setOTDBinfo(self, otdb_id, otdb_info, otdb_status):
try:
logger.info('Setting specticication for otdb_id %s: %s' % (otdb_id, otdb_info))
self.otdbrpc.taskSetSpecification(otdb_id, otdb_info)
self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], otdb_info["LOFAR.ObsSW.Observation.stopTime"])
logger.info('Setting status (%s) for otdb_id %s' % (otdb_status, otdb_id))
self.otdbrpc.taskSetStatus(otdb_id, otdb_status)
except Exception as e:
logger.error(e)
self.doTaskConflict(otdb_id)