-
Adriaan Renting authoredAdriaan Renting authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
propagator.py 6.33 KiB
#!/usr/bin/env python
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $
"""
RAtoOTDBTaskSpecificationPropagator gets a task to be scheduled in OTDB,
reads the info from the RA DB and sends it to OTDB in the correct format.
"""
import logging
import datetime
import time
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC as RADBRPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.translator import RAtoOTDBTranslator
from lofar.mom.momqueryservice.momqueryrpc import MoMRPC
from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME
from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME
logger = logging.getLogger(__name__)
class RAtoOTDBPropagator():
def __init__(self,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
radb_broker=None,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME,
mom_busname=DEFAULT_MOM_BUSNAME,
mom_servicename=DEFAULT_MOM_SERVICENAME,
otdb_broker=None,
mom_broker=None,
broker=None):
"""
RAtoOTDBPropagator updates tasks in the OTDB after the ResourceAssigner is done with them.
:param radb_busname: busname on which the radb service listens (default: lofar.ra.command)
:param radb_servicename: servicename of the radb service (default: RADBService)
:param radb_broker: valid Qpid broker host (default: None, which means localhost)
:param otdb_busname: busname on which the OTDB service listens (default: lofar.otdb.command)
:param otdb_servicename: servicename of the OTDB service (default: OTDBService)
:param otdb_broker: valid Qpid broker host (default: None, which means localhost)
:param broker: if specified, overrules radb_broker and otdb_broker. Valid Qpid broker host (default: None, which means localhost)
"""
if broker:
radb_broker = broker
otdb_broker = broker
mom_broker = broker
self.radbrpc = RADBRPC(busname=radb_busname, servicename=radb_servicename, broker=radb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=otdb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.momrpc = MoMRPC(busname=mom_busname, servicename=mom_servicename, broker=mom_broker)
self.translator = RAtoOTDBTranslator()
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Internal use only. (handles scope 'with')"""
self.close()
def open(self):
"""Open rpc connections to radb service and resource estimator service"""
self.radbrpc.open()
self.otdbrpc.open()
self.momrpc.open()
def close(self):
"""Close rpc connections to radb service and resource estimator service"""
self.radbrpc.close()
self.otdbrpc.close()
self.momrpc.close()
def doTaskConflict(self, ra_id, otdb_id, mom_id):
logger.info('doTaskConflict: otdb_id=%s mom_id=%s' % (otdb_id, mom_id))
if not otdb_id:
logger.warning('doTaskConflict no valid otdb_id: otdb_id=%s' % (otdb_id,))
return
self.otdbrpc.taskSetStatus(otdb_id, 'conflict')
def doTaskScheduled(self, ra_id, otdb_id, mom_id):
logger.info('doTaskScheduled: otdb_id=%s mom_id=%s' % (otdb_id, mom_id))
if not otdb_id:
logger.warning('doTaskScheduled no valid otdb_id: otdb_id=%s' % (otdb_id,))
return
ra_info = self.getRAinfo(ra_id)
project = self.momrpc.getProjectDetails(mom_id)
project_name = "_".join(project['project_name'].split())
otdb_info = self.translator.CreateParset(otdb_id, ra_info, project_name)
logger.debug("Parset info for OTDB: %s" %otdb_info)
self.setOTDBinfo(otdb_id, otdb_info, 'scheduled')
def getRAinfo(self, ra_id):
info = {}
info["storage"] = {}
task = self.radbrpc.getTask(ra_id)
claims = self.radbrpc.getResourceClaims(task_id=ra_id, extended=True, include_properties=True)
for claim in claims:
logger.debug("Processing claim: %s" % claim)
if claim['resource_type_name'] == 'storage':
info['storage'] = claim
info["starttime"] = task["starttime"]
info["endtime"] = task["endtime"]
info["status"] = task["status"]
return info
def setOTDBinfo(self, otdb_id, otdb_info, otdb_status):
self.otdbrpc.taskSetSpecification(otdb_id, otdb_info)
self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], otdb_info["LOFAR.ObsSW.Observation.stopTime"])
self.otdbrpc.taskSetStatus(otdb_id, otdb_status)