diff --git a/LCS/Messaging/python/messaging/__init__.py b/LCS/Messaging/python/messaging/__init__.py index 5333362ac59e6e8dc84ac00760be58f793c08e6b..f38fe66bee825999ec41aed30cc47accfcb96899 100644 --- a/LCS/Messaging/python/messaging/__init__.py +++ b/LCS/Messaging/python/messaging/__init__.py @@ -30,7 +30,7 @@ from messagebus import * from RPC import * from Service import * import logging -import os +from lofar.common import isProductionEnvironment, isTestEnvironment def setQpidLogLevel(qpidLogLevel): for name, logger in logging.Logger.manager.loggerDict.items(): @@ -38,10 +38,10 @@ def setQpidLogLevel(qpidLogLevel): logger.setLevel(qpidLogLevel) def adaptNameToEnvironment(name): - if os.environ.get('LOFARENV', '') == 'PRODUCTION': + if isProductionEnvironment(): return name #return original name only for PRODUCTION LOFARENV - if os.environ.get('LOFARENV', '') == 'TEST': + if isTestEnvironment(): return 'test.%s' % name #return 'test.' prefixed name only for TEST LOFARENV # in all other cases prefix queue/bus name with 'devel.' diff --git a/LCS/PyCommon/CMakeLists.txt b/LCS/PyCommon/CMakeLists.txt index 42cf8512e8926b7335609fc647cfa424befb37e8..c3803fdaec5f5032ec5480b41bf087cae392250b 100644 --- a/LCS/PyCommon/CMakeLists.txt +++ b/LCS/PyCommon/CMakeLists.txt @@ -8,6 +8,7 @@ include(PythonInstall) add_subdirectory(test) set(_py_files + __init__.py dbcredentials.py factory.py util.py diff --git a/LCS/PyCommon/__init__.py b/LCS/PyCommon/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0453289abdc17d088ea23afa2eeef7d2e43dd2d8 100644 --- a/LCS/PyCommon/__init__.py +++ b/LCS/PyCommon/__init__.py @@ -0,0 +1,39 @@ +# __init__.py: Module initialization file. +# +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: __init__.py 1568 2015-09-18 15:21:11Z loose $ + +""" +Module initialization file. +""" + +import os + +def isProductionEnvironment(): + '''check if the program is running in a lofar producution environment''' + return os.environ.get('LOFARENV', '') == 'PRODUCTION' + +def isTestEnvironment(): + '''check if the program is running in a lofar test environment''' + return os.environ.get('LOFARENV', '') == 'TEST' + +def isDevelopmentEnvironment(): + '''check if the program is running in a lofar development (not production or test) environment''' + return not (isProductionEnvironment() or isTestEnvironment()) diff --git a/SAS/MoM/MoMQueryService/config.py b/SAS/MoM/MoMQueryService/config.py index e846acfbd04c3411d2117538c4859461d99c6f22..a5b66e022d8f044d04607ee5637fa55e1c0adebc 100644 --- a/SAS/MoM/MoMQueryService/config.py +++ b/SAS/MoM/MoMQueryService/config.py @@ -3,5 +3,5 @@ from lofar.messaging import adaptNameToEnvironment -DEFAULT_BUSNAME = adaptNameToEnvironment('lofar.ra.command') -DEFAULT_SERVICENAME = 'momqueryservice' +DEFAULT_MOMQUERY_BUSNAME = adaptNameToEnvironment('lofar.ra.command') +DEFAULT_MOMQUERY_SERVICENAME = 'momqueryservice' diff --git a/SAS/MoM/MoMQueryService/momqueryrpc.py b/SAS/MoM/MoMQueryService/momqueryrpc.py index 0514fa2d3dd8529ba8fee881fee3dbd1766e82a7..b9a9548f8d68f99d8eb5e5e3ee0c466075f58949 100644 --- a/SAS/MoM/MoMQueryService/momqueryrpc.py +++ b/SAS/MoM/MoMQueryService/momqueryrpc.py @@ -4,7 +4,7 @@ import sys import logging from optparse import OptionParser from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME ''' Simple RPC client for Service momqueryservice ''' @@ -12,7 +12,7 @@ from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAM logger = logging.getLogger(__file__) -class MoMRPC(RPCWrapper): +class MoMQueryRPC(RPCWrapper): def getProjectDetails(self, ids): '''get the project details for one or more mom ids :param ids single or list of mom ids @@ -44,8 +44,8 @@ def main(): parser = OptionParser('%prog [options]', description='do requests to the momqueryservice from the commandline') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') - parser.add_option('-b', '--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %s' % DEFAULT_BUSNAME) - parser.add_option('-s', '--servicename', dest='servicename', type='string', default=DEFAULT_SERVICENAME, help='Name for this service, default: %s' % DEFAULT_SERVICENAME) + parser.add_option('-b', '--busname', dest='busname', type='string', default=DEFAULT_MOMQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker, default: [%default]') + parser.add_option('-s', '--servicename', dest='servicename', type='string', default=DEFAULT_MOMQUERY_SERVICENAME, help='Name for this service, default: [%default]') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option('-P', '--projects', dest='projects', action='store_true', help='get list of all projects') parser.add_option('-p', '--project_details', dest='project_details', type='int', help='get project details for mom object with given id') @@ -57,7 +57,7 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO if options.verbose else logging.WARN) - with MoMRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc: + with MoMQueryRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc: if options.projects: projects = rpc.getProjects() for project in projects: diff --git a/SAS/MoM/MoMQueryService/momqueryservice.py b/SAS/MoM/MoMQueryService/momqueryservice.py index 81571c052a98bbcbab5714fe563819120d009578..5893a21bc8be99bece420655fab9ef4a13d51ede 100755 --- a/SAS/MoM/MoMQueryService/momqueryservice.py +++ b/SAS/MoM/MoMQueryService/momqueryservice.py @@ -28,7 +28,7 @@ from mysql.connector.errors import OperationalError from lofar.messaging import Service from lofar.messaging.Service import MessageHandlerInterface from lofar.common.util import waitForInterrupt -from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.common import dbcredentials logger=logging.getLogger(__file__) @@ -185,8 +185,8 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface): def getProjects(self): return self.momdb.getProjects() -def createService(busname=DEFAULT_BUSNAME, - servicename=DEFAULT_SERVICENAME, +def createService(busname=DEFAULT_MOMQUERY_BUSNAME, + servicename=DEFAULT_MOMQUERY_SERVICENAME, dbcreds=None, handler=None, broker=None): @@ -217,8 +217,8 @@ def main(): parser = OptionParser("%prog [options]", description='runs the momqueryservice') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') - parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, [default: %default]") - parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, [default: %default]") + parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_MOMQUERY_BUSNAME, help="Name of the bus exchange on the qpid broker, [default: %default]") + parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_MOMQUERY_SERVICENAME, help="Name for this service, [default: %default]") parser.add_option_group(dbcredentials.options_group(parser)) parser.set_defaults(dbcredentials="MoM") (options, args) = parser.parse_args() diff --git a/SAS/MoM/MoMQueryService/test/test_momqueryservice.py b/SAS/MoM/MoMQueryService/test/test_momqueryservice.py index 0f160df6d0b44507278c40d0835b7f23c356b3d6..d9a918f8db60e038fd2aca3e0e0bbb033e3a9334 100755 --- a/SAS/MoM/MoMQueryService/test/test_momqueryservice.py +++ b/SAS/MoM/MoMQueryService/test/test_momqueryservice.py @@ -24,8 +24,8 @@ import uuid import logging from lofar.mom.momqueryservice.momqueryservice import createService from lofar.mom.momqueryservice.momqueryservice import ProjectDetailsQueryHandler -from lofar.mom.momqueryservice.momqueryrpc import MoMRPC -from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_SERVICENAME from qpid.messaging import Connection from qpidtoollibs import BrokerAgent @@ -56,7 +56,7 @@ try: # inject the mock into the service with createService(busname, handler=MockProjectDetailsQueryHandler), \ - MoMRPC(busname, DEFAULT_SERVICENAME) as momrpc: + MoMQueryRPC(busname, DEFAULT_MOMQUERY_SERVICENAME) as momrpc: class TestLTAStorageDb(unittest.TestCase): def testProjectDetailsQuery(self): diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index 975992045a7aeea59e0dea1a839114f02555406c..725f248142e515c3fe3b74fc949e9276573075c8 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -40,9 +40,8 @@ from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.otdbrpc im from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.translator import RAtoOTDBTranslator -from lofar.mom.momqueryservice.momqueryrpc import MoMRPC -from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME -from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME logger = logging.getLogger(__name__) @@ -54,8 +53,8 @@ class RAtoOTDBPropagator(): radb_broker=None, otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, otdb_servicename=DEFAULT_OTDB_SERVICENAME, - mom_busname=DEFAULT_MOM_BUSNAME, - mom_servicename=DEFAULT_MOM_SERVICENAME, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, + mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, otdb_broker=None, mom_broker=None, broker=None): @@ -76,7 +75,7 @@ class RAtoOTDBPropagator(): self.radbrpc = RADBRPC(busname=radb_busname, servicename=radb_servicename, broker=radb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=otdb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now - self.momrpc = MoMRPC(busname=mom_busname, servicename=mom_servicename, broker=mom_broker) + self.momrpc = MoMQueryRPC(busname=mom_busname, servicename=mom_servicename, broker=mom_broker) self.translator = RAtoOTDBTranslator() def __enter__(self): @@ -118,11 +117,14 @@ class RAtoOTDBPropagator(): return ra_info = self.getRAinfo(ra_id) + logger.info('RA info for ra_id=%s otdb_id=%s: %s' % (ra_id, otdb_id, ra_info)) + # check if this is a CEP4 task, or an old CEP2 task # at this moment the most simple check is to see if RA claimed (CEP4) storage # TODO: do proper check on cluster/storage/etc if not ra_info['storage']: logger.info("No (CEP4) storage claimed for ra_id=%s otdb_id=%s, skipping otdb specification update." % (ra_id, otdb_id)) + return #get mom project name try: @@ -145,7 +147,7 @@ class RAtoOTDBPropagator(): info = {} info["storage"] = {} task = self.radbrpc.getTask(ra_id) - claims = self.radbrpc.getResourceClaims(task_id=ra_id, extended=True, include_properties=True) + claims = self.radbrpc.getResourceClaims(task_ids=ra_id, extended=True, include_properties=True) for claim in claims: logger.debug("Processing claim: %s" % claim) if claim['resource_type_name'] == 'storage': @@ -156,10 +158,11 @@ class RAtoOTDBPropagator(): return info def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): - logger.info('Setting specticication and status (%s) for otdb_id %s' % (otdb_status, otdb_id)) try: + logger.info('Setting specticication for otdb_id %s: %s' % (otdb_id, otdb_info)) self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], otdb_info["LOFAR.ObsSW.Observation.stopTime"]) + logger.info('Setting status (%s) for otdb_id %s' % (otdb_status, otdb_id)) self.otdbrpc.taskSetStatus(otdb_id, otdb_status) except Exception as e: logger.error(e) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py index 98b29ea356eba84252bf2bcdfd54c694486bd303..3f3c06fc681c371c6960aaf189c9cc8cb4cd3e00 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py @@ -96,8 +96,7 @@ def main(): from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME - from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME - from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME + from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME # Check the invocation arguments @@ -110,8 +109,8 @@ def main(): parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %default") parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default") parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default") - parser.add_option("--mom_busname", dest="mom_busname", type="string", default=DEFAULT_MOM_BUSNAME, help="Name of the bus on which the MoM service listens, default: %default") - parser.add_option("--mom_servicename", dest="mom_servicename", type="string", default=DEFAULT_MOM_SERVICENAME, help="Name of the MoM service, default: %default") + parser.add_option("--mom_busname", dest="mom_busname", type="string", default=DEFAULT_MOMQUERY_BUSNAME, help="Name of the bus on which the MoM service listens, default: %default") + parser.add_option("--mom_servicename", dest="mom_servicename", type="string", default=DEFAULT_MOMQUERY_SERVICENAME, help="Name of the MoM service, default: %default") parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index 9ac56974fe0240008e4b5b745e07ba49d9cb7d3b..3a70d827f69b5e6f71a0ed7ec00dd9afd6032f96 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -29,6 +29,8 @@ import logging from lofar.common.util import to_csv_string from math import ceil, floor +from lofar.common import isProductionEnvironment, isTestEnvironment + #from lofar.parameterset import parameterset logger = logging.getLogger(__name__) @@ -43,6 +45,20 @@ class RAtoOTDBTranslator(): RAtoOTDBTranslator translates values from the RADB into parset keys to be stored in an OTDB Tree """ + def cep4DataPath(self): + '''returns the path to the data dir on CEP4, depending on environment''' + if isProductionEnvironment(): + return "CEP4:/data/projects" + + if isTestEnvironment(): + return "CEP4:/data/test-projects" + + return "CEP4:/data/dev-projects" + + def locationPath(self, project_name, otdb_id): + '''returns the full path to the data dir on CEP4 for give project and otdb_id, depending on environment''' + return "%s/%s/L%d" % (self.cep4DataPath(), project_name, otdb_id) + def CreateCorrelated(self, otdb_id, storage_properties, project_name): sb_nr = 0 locations = [] @@ -52,7 +68,7 @@ class RAtoOTDBTranslator(): logging.debug('processing sap: %s' % sap) if "nr_of_uv_files" in sap['properties']: for _ in xrange(sap['properties']['nr_of_uv_files']): - locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) + locations.append(self.locationPath(project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr)) sb_nr += 1 result[PREFIX + 'DataProducts.Output_Correlated.locations'] = '[' + to_csv_string(locations) + ']' @@ -73,7 +89,7 @@ class RAtoOTDBTranslator(): for tab in xrange(nr_tabs): for stokes in xrange(nr_stokes): for part in xrange(nr_parts): - locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) + locations.append(self.locationPath(project_name, otdb_id)) filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = '[' + to_csv_string(filenames) + ']' @@ -93,7 +109,7 @@ class RAtoOTDBTranslator(): for tab in xrange(nr_tabs): for stokes in xrange(nr_stokes): for part in xrange(nr_parts): - locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) + locations.append(self.locationPath(project_name, otdb_id)) filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = '[' + to_csv_string(filenames) + ']' @@ -107,7 +123,7 @@ class RAtoOTDBTranslator(): for sap in storage_properties["saps"]: ##We might need to sort saps? if "nr_of_im_files" in sap['properties']: for _ in range(sap['properties']['nr_of_im_files']): - locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) + locations.append(self.locationPath(project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = '[' + to_csv_string(filenames) + ']' @@ -121,7 +137,7 @@ class RAtoOTDBTranslator(): for sap in storage_properties["saps"]: ##We might need to sort saps? if "nr_of_img_files" in sap['properties']: for _ in range(sap['properties']['nr_of_img_files']): - locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) + locations.append(self.locationPath(project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = '[' + to_csv_string(filenames) + ']' @@ -135,7 +151,7 @@ class RAtoOTDBTranslator(): for sap in storage_properties["saps"]: ##We might need to sort saps? if "nr_of_uv_files" in sap['properties']: for _ in range(sap['properties']['nr_of_pulp_files']): - locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) + locations.append(self.locationPath(project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = '[' + to_csv_string(filenames) + ']' diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index 0675e1c8546288fa666707ba57129b7182138716..150d6541f8fef1d9ca02797bddb0c4e322009d71 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -103,14 +103,20 @@ class ResourceAssigner(): otdb_id = specification_tree['otdb_id'] taskType = specification_tree.get('task_type', '').lower() - status = specification_tree.get('state', 'prescheduled').lower() + status = specification_tree.get('state', '').lower() + + if status not in ['approved', 'prescheduled']: # cep2 accepts both, cep4 only prescheduled, see below + logger.info('skipping specification for otdb_id=%s because status=%s', (otdb_id, status)) #parse main parset... mainParset = parameterset(specification_tree['specification']) momId = mainParset.getInt('Observation.momID', -1) - startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S') - endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') + try: + startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S') + endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') + except ValueError: + logger.warning('cannot parse for start/end time from specification for otdb_id=%s', (otdb_id, )) # insert new task and specification in the radb # any existing specification and task with same otdb_id will be deleted automatically @@ -128,12 +134,10 @@ class ResourceAssigner(): # do not assign resources to task for other clusters than cep4 if not self.checkClusterIsCEP4(mainParset): - try: - #apply the most recent otdb status to this radb task - otdb_status = self.otdbrpc.taskGetStatus(otdb_id) - self.radbrpc.updateTaskStatusForOtdbId(otdb_id, otdb_status) - except Exception as e: - logger.error(e) + return + + if status != 'prescheduled': + logger.info('skipping resource assignment for CEP4 task otdb_id=%s because status=%s', (otdb_id, status)) return needed = self.getNeededResouces(specification_tree) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index 9bc14f4c0566318327ed3f47e12fd40729178a18..75d70775208d3a74b8aa5d02db9158b457d3d911 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -821,6 +821,9 @@ class RADatabase: claims = list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) + if self.log_queries: + logger.info("found %s claims" % len(claims)) + if include_properties and claims: claimDict = {c['id']:c for c in claims} claim_ids = claimDict.keys() diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py index a03b616714988c5f739846e1290ecff208543b01..f1d9cf31031bc1be077172ae08b7585512385c51 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py @@ -30,9 +30,6 @@ Typical usage is to derive your own subclass from RADBChangesHandler and impleme from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME, DEFAULT_NOTIFICATION_SUBJECTS from lofar.sas.resourceassignment.database.radbbuslistener import RADBBusListener from lofar.common.util import waitForInterrupt -from lofar.mom.momqueryservice.momqueryrpc import MoMRPC -from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME -from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails import qpid.messaging diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py index 3d99008973401eec49189110244bd7e62961e4dd..221c4a040647301d59e0159405a130ba0f73ac53 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py @@ -47,9 +47,8 @@ from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_SU from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME -from lofar.mom.momqueryservice.momqueryrpc import MoMRPC -from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME -from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails #from lofar.sas.resourceassignment.resourceassigner. import updateTaskMomDetails @@ -322,8 +321,8 @@ def main(): parser.add_option('--radb_servicename', dest='radb_servicename', type='string', default=DEFAULT_RADB_SERVICENAME, help='Name of the radbservice, default: %default') parser.add_option('--radb_notification_busname', dest='radb_notification_busname', type='string', default=DEFAULT_RADB_CHANGES_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the radb notifications are published, default: %default') parser.add_option('--radb_notification_subjects', dest='radb_notification_subjects', type='string', default=DEFAULT_RADB_CHANGES_SUBJECTS, help='Subject(s) to listen for on the radb notification bus exchange on the qpid broker, default: %default') - parser.add_option('--mom_busname', dest='mom_busname', type='string', default=DEFAULT_MOM_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momservice listens, default: %default') - parser.add_option('--mom_servicename', dest='mom_servicename', type='string', default=DEFAULT_MOM_SERVICENAME, help='Name of the momservice, default: %default') + parser.add_option('--mom_busname', dest='mom_busname', type='string', default=DEFAULT_MOMQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker on which the momservice listens, default: %default') + parser.add_option('--mom_servicename', dest='mom_servicename', type='string', default=DEFAULT_MOMQUERY_SERVICENAME, help='Name of the momservice, default: %default') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() @@ -333,7 +332,7 @@ def main(): global rarpc rarpc = RARPC(busname=DEFAULT_RADB_BUSNAME, servicename=DEFAULT_RADB_SERVICENAME, broker=options.broker) global momrpc - momrpc = MoMRPC(busname=DEFAULT_MOM_BUSNAME, servicename=DEFAULT_MOM_SERVICENAME, timeout=2.5, broker=options.broker) + momrpc = MoMQueryRPC(busname=DEFAULT_MOMQUERY_BUSNAME, servicename=DEFAULT_MOMQUERY_SERVICENAME, timeout=2.5, broker=options.broker) global radbchangeshandler radbchangeshandler = RADBChangesHandler(DEFAULT_RADB_CHANGES_BUSNAME, broker=options.broker, momrpc=momrpc) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index d1a9701a01818aee3a3c40f7c0e206b1f3da387a..45b18dc87c7d762be6509a695284bba48e4dc4ce 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -55,14 +55,15 @@ class BaseResourceEstimator(object): startTime = self._getDateTime(start) endTime = self._getDateTime(end) if startTime >= endTime: + logger.warning("startTime is after endTime") return 1 ##TODO To prevent divide by zero later return totalSeconds(endTime - startTime) #TODO check if this makes duration = int(parset.get('duration', 0)) as a key reduntant? def _calculate(self, parset, input_files={}): - raise NotImplementedError('estimate() in base class is called. Please implement estimate() in your subclass') + raise NotImplementedError('calculate() in base class is called. Please implement calculate() in your subclass') - def estimate(self, parset, input_files={}): + def verify_and_estimate(self, parset, input_files={}): """ Create estimates for a single process based on its parset and input files""" if self._checkParsetForRequiredKeys(parset): estimates = self._calculate(parameterset(parset), input_files) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index d3b6b98dd1e998657e08f0794e76a4b74e91cb12..96f4b4044ee5d428c106f6cfcfce12d7c21d7ae9 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -23,50 +23,82 @@ import logging from math import ceil from base_resource_estimator import BaseResourceEstimator -from lofar.parameterset import parameterset logger = logging.getLogger(__name__) -class CalibrationPipelineResourceEstimator(BaseResourceEstimator): - """ CalibrationPipelineResourceEstimator +DATAPRODUCTS = "Observation.DataProducts." +PIPELINE = "Observation.ObservationControl.PythonControl." + +#Observation.DataProducts.Output_Correlated.storageClusterName= +class CalibrationPipelineResourceEstimator(BaseResourceEstimator): + """ ResourceEstimator for Calibration Pipelines """ def __init__(self, kwargs, input_files): + logger.info("init CalibrationPipelineResourceEstimator") BaseResourceEstimator.__init__(self, name='calibration_pipeline') - self.required_keys = ('correlated.enabled', 'correlated.demixing_settings.freq_step', - 'correlated.demixing_settings.time_step', 'instrument_model.enabled') + self.required_keys = ('Observation.startTime', + 'Observation.stopTime', + DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Output_InstrumentModel.enabled', + DATAPRODUCTS + 'Output_Correlated.enabled', + PIPELINE + 'DPPP.demixer.freqstep', + PIPELINE + 'DPPP.demixer.timestep') def _calculate(self, parset, input_files): - """ Estimate for calibration pipeline + """ Estimate for CalibrationPipeline. Also gets used for AveragingPipeline calculates: datasize (number of files, file size), bandwidth + input_files should look something like: + 'input_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} + + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, + 'im': {'nr_of_im_files': 481, 'im_file_size': 148295} + }} """ logger.debug("start estimate '{}'".format(self.name)) - parset = parameterset(parset).makeSubset('dp.output') - output_files = {} - duration = int(kwargs.get('observation.duration', 1)) - if 'dp_correlated_uv' in input_files: - if parset['correlated']['enabled'] == 'true': - logger.debug("calculate correlated data size") - freq_step = int(parset['correlated']['demixing_settings']['freq_step']) - time_step = int(parset['correlated']['demixing_settings']['time_step']) - reduction_factor = freq_step * time_step - input_file_size = int(input_files['dp_correlated_uv']['file_size']) - output_file_size = 0.0 - if reduction_factor > 0: - new_size = input_file_size / reduction_factor - output_file_size = new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0 - output_files['dp_correlated_uv'] = {'nr_files': int(input_files['dp_correlated_uv']['nr_files']), 'file_size': int(output_file_size)} - logger.debug("dp_correlated_uv: {} files {} bytes each".format(int(input_files['dp_correlated_uv']['nr_files']), int(output_file_size))) + logger.info('parset: %s ' % parset) + result = {'errors': []} + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + freq_step = parset.getInt(PIPELINE + 'DPPP.demixer.freqstep', 1) #TODO, should these have defaults? + time_step = parset.getInt(PIPELINE + 'DPPP.demixer.timestep', 1) + reduction_factor = freq_step * time_step + + if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): + logger.warning('Output_Correlated is not enabled') + result['errors'].append('Output_Correlated is not enabled') + if not 'uv' in input_files: + logger.warning('Missing UV Dataproducts in input_files') + result['errors'].append('Missing UV Dataproducts in input_files') + if reduction_factor < 1: + logger.warning('freqstep * timestep is not valid') + result['errors'].append('freqstep * timestep is not positive') + if result['errors']: + return result + + logger.debug("calculate correlated data size") + result['output_files'] = {} + input_file_size = input_files['uv']['uv_file_size'] + output_file_size = 0.0 + new_size = input_file_size / float(reduction_factor) + output_file_size = new_size + new_size / 64.0 * (1.0 + reduction_factor) + new_size / 2.0 + result['output_files']['uv'] = {'nr_of_uv_files': input_files['uv']['nr_of_uv_files'], 'uv_file_size': int(output_file_size)} + logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['uv']['nr_of_uv_files'], result['output_files']['uv']['uv_file_size'])) - if parset['instrument_model']['enabled'] == 'true': - logger.debug("calculate instrument-model data size") - output_files['dp_instrument_model'] = {'nr_files': int(input_files['dp_correlated_uv']['nr_files']), 'file_size': 1000} - logger.debug("dp_instrument_model: {} files {} bytes each".format(int(input_files['dp_correlated_uv']['nr_files']), 1000)) + if parset.getBool(DATAPRODUCTS + 'Output_InstrumentModel.enabled'): + logger.debug("calculate instrument-model data size") + result['output_files']['im'] = {'nr_of_im_files': input_files['uv']['nr_of_uv_files'], 'im_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['im']['nr_of_im_files'], result['output_files']['im']['im_file_size'])) - # count total data size - total_data_size = 0 - for values in output_files.itervalues(): - total_data_size += values['nr_files'] * values['file_size'] - total_bandwidth = ceil((self.total_data_size * 8) / duration) # bits/second - return {"total_data_size":total_data_size, "total_bandwidth":total_bandwidth, "output_files":output_files} + # count total data size + total_data_size = result['output_files']['uv']['nr_of_uv_files'] * result['output_files']['uv']['uv_file_size'] + \ + result['output_files']['im']['nr_of_im_files'] * result['output_files']['im']['im_file_size'] # bytes + total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second + result['storage'] = {'total_size': total_data_size} + result['bandwidth'] = {'total_size': total_bandwidth} + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index e3d239100b7edcf45e9989e5f05924080545ad65..1157dc3601884831ee552b9abd8d262f91975306 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -27,40 +27,73 @@ from lofar.parameterset import parameterset logger = logging.getLogger(__name__) -class ImagePipelineResourceEstimator(BaseResourceEstimator): - """ ImagePipelineResourceEstimator +DATAPRODUCTS = "Observation.DataProducts." +PIPELINE = "Observation.ObservationControl.PythonControl." + +#Observation.DataProducts.Output_Correlated.storageClusterName= +#Observation.ObservationControl.PythonControl.AWimager +class ImagePipelineResourceEstimator(BaseResourceEstimator): + """ ResourceEstimator for Imaging Pipelines """ def __init__(self, kwargs, input_files): - BaseResourceEstimator.__init__(self, name='image_pipeline') - self.parset = ParameterSet(kwargs).make_subset('dp.output') - self.duration = int(kwargs.get('observation.duration', 1)) - self.input_files = input_files - self.required_keys = ('skyimage.enabled', 'skyimage.slices_per_image', 'skyimage.subbands_per_image') - if self.checkParsetForRequiredKeys(): - self.estimate() - return + logger.info("init ImagePipelineResourceEstimator") + BaseResourceEstimator.__init__(self, name='imaging_pipeline') + self.required_keys = ('Observation.startTime', + 'Observation.stopTime', + DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Output_SkyImage.enabled', + PIPELINE + 'Imaging.slices_per_image', + PIPELINE + 'Imaging.subbands_per_image') - def estimate(self): - """ Estimate for image pipeline + def _calculate(self, parset, input_files): + """ Estimate for Imaging Pipeline. Also gets used for MSSS Imaging Pipeline calculates: datasize (number of files, file size), bandwidth + input_files should look something like: + 'input_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} + + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'img': {'nr_of_img_files': 481, 'img_file_size': 148295} + }} """ logger.debug("start estimate '{}'".format(self.name)) - if 'dp_correlated_uv' in self.input_files: - if self.parset['skyimage']['enabled'] == 'true': - logger.debug("calculate skyimage data size") - slices_per_image = int(self.parset['skyimage']['slices_per_image']) - subbands_per_image = int(self.parset['skyimage']['subbands_per_image']) - if slices_per_image and subbands_per_image: - nr_input_subbands = int(self.input_files['dp_correlated_uv']['nr_files']) - if (nr_input_subbands % (subbands_per_image * slices_per_image)) == 0: - nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) - self.output_files['dp_sky_image'] = {'nr_files': nr_images, 'file_size': 1000} - logger.debug("dp_sky_image: {} files {} bytes each".format(nr_images, 1000)) + logger.info('parset: %s ' % parset) + result = {'errors': []} + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + slices_per_image = parset.getInt(PIPELINE + 'Imaging.slices_per_image', 0) #TODO, should these have defaults? + subbands_per_image = parset.getInt(PIPELINE + 'Imaging.subbands_per_image', 0) + + if not parset.getBool(DATAPRODUCTS + 'Output_SkyImage.enabled'): + logger.warning('Output_SkyImage is not enabled') + result['errors'].append('Output_SkyImage is not enabled') + if not 'uv' in input_files: + logger.warning('Missing UV Dataproducts in input_files') + result['errors'].append('Missing UV Dataproducts in input_files') + else: + nr_input_subbands = input_files['uv']['nr_of_uv_files'] + if not slices_per_image or not subbands_per_image: + logger.warning('slices_per_image or subbands_per_image are not valid') + result['errors'].append('Missing UV Dataproducts in input_files') + if nr_input_subbands % (subbands_per_image * slices_per_image) > 0: + logger.warning('slices_per_image and subbands_per_image not a multiple of number of inputs') + result['errors'].append('slices_per_image and subbands_per_image not a multiple of number of inputs') + if result['errors']: + return result + + logger.debug("calculate sky image data size") + result['output_files'] = {} + nr_images = nr_input_subbands / (subbands_per_image * slices_per_image) + result['output_files']['img'] = {'nr_of_img_files': nr_images, 'img_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("sky_images: {} files {} bytes each".format(result['output_files']['img']['nr_of_img_files'], result['output_files']['img']['img_file_size'])) - # count total data size - for values in self.output_files.itervalues(): - self.total_data_size += values['nr_files'] * values['file_size'] - self.total_bandwidth = ceil((self.total_data_size * 8) / self.duration) # bits/second - return + # count total data size + total_data_size = result['output_files']['img']['nr_of_img_files'] * result['output_files']['img']['img_file_size'] # bytes + total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second + result['storage'] = {'total_size': total_data_size} + result['bandwidth'] = {'total_size': total_bandwidth} + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index eb52034868de6d720f577a0728ae8c94047f426d..9adffa1a555851b3533ab7216aa8c51307cd151a 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -23,48 +23,74 @@ import logging from math import ceil from base_resource_estimator import BaseResourceEstimator -from lofar.parameterset import parameterset logger = logging.getLogger(__name__) +DATAPRODUCTS = "Observation.DataProducts." +PIPELINE = "Observation.ObservationControl.PythonControl." + +#Observation.DataProducts.Output_Correlated.storageClusterName= class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): - """ LongBaselinePipelineResourceEstimator + """ ResourceEstimator for Long Baseline Pipelines """ def __init__(self, kwargs, input_files): + logger.info("init LongBaselinePipelineResourceEstimator") BaseResourceEstimator.__init__(self, name='longbaseline_pipeline') - logger.debug("init LongBaselinePipelineResourceEstimator") - self.parset = ParameterSet(kwargs).make_subset('dp.output') - self.duration = int(kwargs.get('observation.duration', 1)) - self.input_files = input_files - self.required_keys = ('correlated.enabled', 'longbaseline.subband_groups_per_ms', - 'longbaseline.subbands_per_subband_group') - if self.checkParsetForRequiredKeys(): - self.estimate() - return + self.required_keys = ('Observation.startTime', + 'Observation.stopTime', + DATAPRODUCTS + 'Input_Correlated.enabled', + DATAPRODUCTS + 'Output_Correlated.enabled', + PIPELINE + 'LongBaseline.subbandgroups_per_ms', + PIPELINE + 'LongBaseline.subbands_per_subbandgroup') - def estimate(self): - """ Estimate for calibration pipeline + def _calculate(self, parset, input_files): + """ Estimate for Long Baseline Pipeline calculates: datasize (number of files, file size), bandwidth + input_files should look something like: + 'input_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, ...} + + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104} + }} """ logger.debug("start estimate '{}'".format(self.name)) - if 'dp_correlated_uv' in self.input_files: - if self.parset['correlated']['enabled'] == 'true': - logger.debug("calculate long baseline data size") + logger.info('parset: %s ' % parset) + result = {'errors': []} + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) + subbandgroups_per_ms = parset.getInt(PIPELINE + 'LongBaseline.subbandgroups_per_ms', 0) #TODO, should these have defaults? + subbands_per_subbandgroup = parset.getInt(PIPELINE + 'LongBaseline.subbands_per_subbandgroup', 0) - nr_output_files = 0 - subband_groups_per_ms = int(self.parset['longbaseline']['subband_groups_per_ms']) - subbands_per_subband_group = int(self.parset['longbaseline']['subbands_per_subband_group']) - if subband_groups_per_ms and subbands_per_subband_group: - nr_input_files = int(self.input_files['dp_correlated_uv']['nr_files']) - if (nr_input_files % (subbands_per_subband_group * subband_groups_per_ms)) == 0: - nr_output_files = nr_input_files / (subbands_per_subband_group * subband_groups_per_ms) - self.output_files['dp_correlated_uv'] = {'nr_files': nr_output_files, 'file_size': 1000} - logger.debug("dp_correlated_uv: {} files {} bytes each".format(nr_output_files, 1000)) + if not parset.getBool(DATAPRODUCTS + 'Output_Correlated.enabled'): + logger.warning('Output_Correlated is not enabled') + result['errors'].append('Output_Correlated is not enabled') + if not 'uv' in input_files: + logger.warning('Missing UV Dataproducts in input_files') + result['errors'].append('Missing UV Dataproducts in input_files') + else: + nr_input_files = input_files['uv']['nr_of_uv_files'] + if not subbandgroups_per_ms or not subbands_per_subbandgroup: + logger.warning('subbandgroups_per_ms or subbands_per_subbandgroup are not valid') + result['errors'].append('Missing UV Dataproducts in input_files') + if nr_input_files % (subbands_per_subband_group * subband_groups_per_ms) > 0: + logger.warning('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + result['errors'].append('subbandgroups_per_ms and subbands_per_subbandgroup not a multiple of number of inputs') + if result['errors']: + return result - # count total data size - for values in self.output_files.itervalues(): - self.total_data_size += values['nr_files'] * values['file_size'] - self.total_bandwidth = ceil((self.total_data_size * 8) / self.duration) # bits/second - return + logger.debug("calculate correlated data size") + result['output_files'] = {} + nr_output_files = nr_input_files / (subbands_per_subbandgroup * subbandgroups_per_ms) + result['output_files']['uv'] = {'nr_of_uv_files': nr_output_files, 'uv_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['uv']['nr_of_uv_files'], result['output_files']['uv']['uv_file_size'])) + # count total data size + total_data_size = result['output_files']['uv']['nr_of_uv_files'] * result['output_files']['uv']['uv_file_size'] # bytes + total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second + result['storage'] = {'total_size': total_data_size} + result['bandwidth'] = {'total_size': total_bandwidth} + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index 50b01555b0ede30943c986b29fddbe86b33c7ea6..e3e8e1622c7f1e28172caad8d2ceb804b77af5ff 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -53,6 +53,16 @@ class ObservationResourceEstimator(BaseResourceEstimator): def _calculate(self, parset, input_files={}): """ Calculate the combined resources needed by the different observation types that can be in a single observation. + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, + 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, + {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, + {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} + ]}}} + The base_resource_estimator adds an {'observation': } around this. """ logger.info("start estimate '{}'".format(self.name)) logger.info('parset: %s ' % parset) @@ -88,7 +98,8 @@ class ObservationResourceEstimator(BaseResourceEstimator): return result def correlated(self, parset, duration): - """ Estimate number of files, file size and bandwidth needed for correlated data""" + """ Estimate number of files, file size and bandwidth needed for correlated data + """ logger.info("calculating correlated datasize") size_of_header = 512 #TODO More magic numbers (probably from Alwin). ScS needs to check these. They look ok though. size_of_overhead = 600000 diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index d907c2f2f2e51e56b158e996ee88c21e26410f0f..f3a8e12949f0d69f6e5f889384871aec9fbd6c65 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -23,49 +23,74 @@ import logging from math import ceil from base_resource_estimator import BaseResourceEstimator -from lofar.parameterset import parameterset logger = logging.getLogger(__name__) -class PulsarPipelineResourceEstimator(BaseResourceEstimator): - """ PulsarPipelineResourceEstimator +DATAPRODUCTS = "Observation.DataProducts." +PIPELINE = "Observation.ObservationControl.PythonControl." + +#Observation.DataProducts.Output_Correlated.storageClusterName= +class PulsarPipelineResourceEstimator(BaseResourceEstimator): + """ ResourceEstimator for Pulsar Pipelines """ def __init__(self, kwargs, input_files): + logger.info("init PulsarPipelineResourceEstimator") BaseResourceEstimator.__init__(self, name='pulsar_pipeline') - self.parset = ParameterSet(kwargs).make_subset('dp.output') - self.duration = int(kwargs.get('observation.duration', 1)) - self.coherent_stokes_type = kwargs.get('observation.coherent_stokes.type') - self.input_files = input_files - self.required_keys = ('pulsar.enabled',) - if self.checkParsetForRequiredKeys(): - self.estimate() - return + self.required_keys = ('Observation.startTime', + 'Observation.stopTime', + DATAPRODUCTS + 'Input_CoherentStokes.enabled', + DATAPRODUCTS + 'Input_IncoherentStokes.enabled', + DATAPRODUCTS + 'Output_Pulsar.enabled') - def estimate(self): - """ Estimate for pulsar pipeline + def _calculate(self, parset, input_files): + """ Estimate for Pulsar Pipeline calculates: datasize (number of files, file size), bandwidth + input_files should look something like: + 'input_files': + {'cs': {'nr_of_cs_files': 48, 'nr_of_cs_stokes': 4, 'cs_file_size': 1482104}, ...} + + reply is something along the lines of: + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'pulp': {'nr_of_pulp_files': 48, 'pulp_file_size': 185104} + }} """ logger.debug("start estimate '{}'".format(self.name)) - if self.parset['pulsar']['enabled'] == 'true': - logger.debug("calculate pulsar data size") - nr_output_files = 0 - if 'dp_coherent_stokes' in self.input_files: - nr_input_files = int(self.input_files['dp_coherent_stokes']['nr_files']) - if self.coherent_stokes_type == 'DATA_TYPE_XXYY': - nr_output_files += nr_input_files / 4 - else: - nr_output_files += nr_input_files + logger.info('parset: %s ' % parset) + result = {'errors': []} + duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) - if 'dp_incoherent_stokes' in self.input_files: - nr_input_files = int(self.input_files['dp_incoherent_stokes']['nr_files']) + if not parset.getBool(DATAPRODUCTS + 'Output_Pulsar.enabled'): + logger.warning('Output_Pulsar is not enabled') + result['errors'].append('Output_Pulsar is not enabled') + if not 'cs' in input_files and not 'is' in input_files: + logger.warning('Missing Both CS and IS Dataproducts in input_files') + result['errors'].append('Missing Both CS and IS Dataproducts in input_files') + if result['errors']: + return result + + logger.debug("calculate pulp data size") + result['output_files'] = {} + nr_output_files = 0 + if 'cs' in input_files: + nr_input_files = input_files['cs']['nr_of_cs_files'] + if input_files['cs']['nr_of_cs_stokes'] == 4: ##TODO Check if this is the same as coherent_stokes_type == 'XXYY' + nr_output_files += nr_input_files / 4 ## Then nr_output_files = nr_input_files / input_files['cs']['nr_of_cs_stokes'] + else: nr_output_files += nr_input_files - self.output_files['dp_pulsar'] = {'nr_files': nr_output_files, 'file_size': 1000} - logger.debug("dp_pulsar: {} files {} bytes each".format(nr_output_files, 1000)) + if 'is' in input_files: + nr_input_files = input_files['is']['nr_of_is_files'] + nr_output_files += nr_input_files + + result['output_files']['pulp'] = {'nr_of_pulp_files': nr_output_files, 'pulp_file_size': 1000} # 1 kB was hardcoded in the Scheduler + logger.debug("correlated_uv: {} files {} bytes each".format(result['output_files']['pulp']['nr_of_pulp_files'], result['output_files']['pulp']['pulp_file_size'])) - # count total data size - for values in self.output_files.itervalues(): - self.total_data_size += values['nr_files'] * values['file_size'] - self.total_bandwidth = ceil((self.total_data_size * 8) / self.duration) # bits/second - return + # count total data size + total_data_size = result['output_files']['pulp']['nr_of_pulp_files'] * result['output_files']['pulp']['pulp_file_size'] # bytes + total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second + result['storage'] = {'total_size': total_data_size} + result['bandwidth'] = {'total_size': total_bandwidth} + return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index 342e180c4aabcd444aa6bb17f98dbca39c556e65..66b4b27b9f3d97e1f766a37ae44f1ff2a6250836 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -18,44 +18,74 @@ class ResourceEstimatorHandler(MessageHandlerInterface): def __init__(self, **kwargs): super(ResourceEstimatorHandler, self).__init__(**kwargs) self.observation = ObservationResourceEstimator() - #self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator() - #self.calibration_pipeline = CalibrationPipelineResourceEstimator() - #self.pulsar_pipeline = PulsarPipelineResourceEstimator() - #self.imaging_pipeline = ImagePipelineResourceEstimator() + self.longbaseline_pipeline = LongBaselinePipelineResourceEstimator() + self.calibration_pipeline = CalibrationPipelineResourceEstimator() + self.pulsar_pipeline = PulsarPipelineResourceEstimator() + self.imaging_pipeline = ImagePipelineResourceEstimator() def handle_message(self, content): specification_tree = content["specification_tree"] return self._get_estimated_resources(specification_tree) ##TODO also handle MoM tasks in RA 1.2 - #def _getPredecessors(self, parset): - - - def _get_estimated_resources(self, specification_tree): - logger.info('get_estimated_resources on: %s' % specification_tree) - result = {} - + def get_subtree_estimate(self, specification_tree): otdb_id = specification_tree['otdb_id'] - main_parset = specification_tree['specification'] + parset = specification_tree['specification'] if specification_tree['task_type'] == 'observation': - result[str(otdb_id)] = self.observation.estimate(main_parset) - - #TODO: implement properly - #pipeline_input_files = result['observation']['output_files'] - - #longbaseline = LongBaselinePipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) - #result.update(longbaseline.result_as_dict()) - - #calibration = CalibrationPipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) - #result.update(calibration.result_as_dict()) - - #pulsar = PulsarPipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) - #result.update(pulsar.result_as_dict()) - - #image = ImagePipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) - #result.update(image.result_as_dict()) - - return result + return {str(otdb_id): self.observation.verify_and_estimate(parset)} + elif specification_tree['task_type'] == 'pipeline': + branch_estimates = {} + for branch in specification_tree['predecessors']: + branch_estimates.update(get_subtree_estimate(branch)) + + if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']: + for id, estimate in branch_estimates: + if not 'im' in estimate['output_files'] and 'uv' in estimate['output_files']: # Not a calibrator pipeline + logger.info('found %d as the target of pipeline %d' % (id, otdb_id)) + input_files = estimate['output_files'] # Need sap here as well + return {str(otdb_id): self.calibration_pipeline.verify_and_estimate(parset, input_files)} + + if specification_tree['task_subtype'] in ['imaging pipeline', 'imaging pipeline msss']: + if len(branch_estimates) > 1: + logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) + input_files = branch_estimates.items()[0][1]['ouput_files'] + return {str(otdb_id): self.calibration_pipeline.verify_and_estimate(parset, input_files)} + + if specification_tree['task_subtype'] in ['long baseline pipeline']: + if len(branch_estimates) > 1: + logger.error('Long baseline pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) + input_files = branch_estimates.items()[0][1]['ouput_files'] + return {str(otdb_id): self.longbaseline_pipeline.verify_and_estimate(parset, input_files)} + + if specification_tree['task_subtype'] in ['pulsar pipeline']: + if len(branch_estimates) > 1: + logger.error('Pulsar pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) + input_files = branch_estimates.items()[0][1]['ouput_files'] + return {str(otdb_id): self.pulsar_pipeline.verify_and_estimate(parset, input_files)} + + else: # reservation, maintenance, system tasks? + logger.info("It's not a pipeline or observation: %s" % otdb_id) + return {str(otdb_id): {}} + def _get_estimated_resources(self, specification_tree): + """ Input is like: + {"otdb_id": otdb_id, "state": 'prescheduled', 'specification': ..., + 'task_type': "pipeline", 'task_subtype': "long baseline pipeline", + 'predecessors': [...]} + + reply is something along the lines of: + {'452648': + {'observation': + {'bandwidth': {'total_size': 19021319494}, + 'storage': {'total_size': 713299481024, + 'output_files': + {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104}, + 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}}, + {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, + {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}} + ]}}}}} + """ + logger.info('get_estimated_resources on: %s' % specification_tree) + return self.get_subtree_estimate(specification_tree): def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None): return Service(servicename=servicename, diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py index 9ad8a9b8190efec003cefae3ff80aaccb8fa0019..cff26544ec6771cefe10b22fc7aab15506504107 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py @@ -36,15 +36,18 @@ class RARPC(RPCWrapper): def insertResourceClaimProperty(self, claim_id, property_type, value): return self.rpc('InsertResourceClaimProperty', claim_id=claim_id, property_type=property_type, value=value) - def getResourceClaims(self, claim_ids=None, lower_bound=None, upper_bound=None, task_id=None, status=None, resource_type=None, extended=False, include_properties=False): + def getResourceClaims(self, claim_ids=None, lower_bound=None, upper_bound=None, task_ids=None, status=None, resource_type=None, extended=False, include_properties=False): claims = self.rpc('GetResourceClaims', claim_ids=claim_ids, lower_bound=lower_bound, upper_bound=upper_bound, - task_id=task_id, + task_ids=task_ids, status=status, resource_type=resource_type, extended=extended, include_properties=include_properties) + + logger.info("found %s claims" % len(claims)) + for claim in claims: claim['starttime'] = claim['starttime'].datetime() claim['endtime'] = claim['endtime'].datetime()