From fb77fdce1b9a6e94495325659e3e60dea379e1fb Mon Sep 17 00:00:00 2001 From: Adriaan Renting <renting@astron.nl> Date: Tue, 12 Apr 2016 13:52:07 +0000 Subject: [PATCH] Task #8886: Added project name in generated directory --- .../lib/propagator.py | 13 +++++- .../lib/rotspservice.py | 7 ++++ .../lib/translator.py | 42 +++++++++---------- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index b8dcfd7ff35..c080a569d92 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -40,6 +40,10 @@ from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.otdbrpc im from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.translator import RAtoOTDBTranslator +from lofar.mom.momqueryservice.momqueryrpc import MoMRPC +from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME +from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME + logger = logging.getLogger(__name__) @@ -50,6 +54,8 @@ class RAtoOTDBPropagator(): radb_broker=None, otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, otdb_servicename=DEFAULT_OTDB_SERVICENAME, + mom_busname=DEFAULT_MOM_BUSNAME, + mom_servicename=DEFAULT_MOM_SERVICENAME, otdb_broker=None, broker=None): """ @@ -68,6 +74,7 @@ class RAtoOTDBPropagator(): self.radbrpc = RADBRPC(busname=radb_busname, servicename=radb_servicename, broker=radb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=otdb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now + self.momrpc = MoMRPC(busname=mom_busname, servicename=mom_servicename, broker=options.broker) self.translator = RAtoOTDBTranslator() def __enter__(self): @@ -83,11 +90,13 @@ class RAtoOTDBPropagator(): """Open rpc connections to radb service and resource estimator service""" self.radbrpc.open() self.otdbrpc.open() + self.momrpc.open() def close(self): """Close rpc connections to radb service and resource estimator service""" self.radbrpc.close() self.otdbrpc.close() + self.momrpc.close() def doTaskConflict(self, ra_id, otdb_id, mom_id): logger.info('doTaskConflict: otdb_id=%s mom_id=%s' % (otdb_id, mom_id)) @@ -102,7 +111,9 @@ class RAtoOTDBPropagator(): logger.warning('doTaskScheduled no valid otdb_id: otdb_id=%s' % (otdb_id,)) return ra_info = self.getRAinfo(ra_id) - otdb_info = self.translator.CreateParset(otdb_id, ra_info) + project = self.momrpc.getProjectDetails(mom_id) + project_name = "_".join(project['project_name'].split()) + otdb_info = self.translator.CreateParset(otdb_id, ra_info, project_name) logger.debug("Parset info for OTDB: %s" %otdb_info) self.setOTDBinfo(otdb_id, otdb_info, 'scheduled') diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py index 4bbb569a2f9..671faeb50b1 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py @@ -96,6 +96,9 @@ def main(): from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME + from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME + from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME + # Check the invocation arguments parser = OptionParser("%prog [options]", @@ -107,6 +110,8 @@ def main(): parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %default") parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default") parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default") + parser.add_option("--mom_busname", dest="mom_busname", type="string", default=DEFAULT_MOM_BUSNAME, help="Name of the bus on which the MoM service listens, default: %default") + parser.add_option("--mom_servicename", dest="mom_servicename", type="string", default=DEFAULT_MOM_SERVICENAME, help="Name of the MoM service, default: %default") parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() @@ -118,6 +123,8 @@ def main(): radb_servicename=options.radb_servicename, otdb_busname=options.otdb_busname, otdb_servicename=options.otdb_servicename, + mom_busname=options.mom_busname, + mom_servicename=options.mom_servicename, broker=options.broker) as propagator: with RATaskStatusChangedListener(busname=options.notification_busname, subject=options.notification_subject, diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index b22bd484c4e..b985599dcd6 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -43,7 +43,7 @@ class RAtoOTDBTranslator(): RAtoOTDBTranslator translates values from the RADB into parset keys to be stored in an OTDB Tree """ - def CreateCorrelated(self, otdb_id, storage_properties): + def CreateCorrelated(self, otdb_id, storage_properties, project_name): sb_nr = 0 locations = [] filenames = [] @@ -52,14 +52,14 @@ class RAtoOTDBTranslator(): logging.debug('processing sap: %s' % sap) if "nr_of_uv_files" in sap['properties']: for _ in xrange(sap['properties']['nr_of_uv_files']): - locations.append("CEP4:/data/projects/test/L%d" % otdb_id) + locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr)) sb_nr += 1 result[PREFIX + 'DataProducts.Output_Correlated.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = '[' + to_csv_string(filenames) + ']' return result - def CreateCoherentStokes(self, otdb_id, storage_properties): + def CreateCoherentStokes(self, otdb_id, storage_properties, project_name): SB_nr = 0 locations = [] filenames = [] @@ -73,13 +73,13 @@ class RAtoOTDBTranslator(): for tab in xrange(nr_tabs): for stokes in xrange(nr_stokes): for part in xrange(nr_parts): - locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = '[' + to_csv_string(filenames) + ']' return result - def CreateIncoherentStokes(self, otdb_id, storage_properties): + def CreateIncoherentStokes(self, otdb_id, storage_properties, project_name): SB_nr = 0 locations = [] filenames = [] @@ -93,13 +93,13 @@ class RAtoOTDBTranslator(): for tab in xrange(nr_tabs): for stokes in xrange(nr_stokes): for part in xrange(nr_parts): - locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = '[' + to_csv_string(filenames) + ']' return result - def CreateCreateInstrumentModel(self, otdb_id, storage_properties): + def CreateCreateInstrumentModel(self, otdb_id, storage_properties, project_name): SB_nr = 0 locations = [] filenames = [] @@ -107,13 +107,13 @@ class RAtoOTDBTranslator(): for sap in storage_properties["saps"]: ##We might need to sort saps? if "nr_of_im_files" in sap['properties']: for _ in range(sap['properties']['nr_of_im_files']): - locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = '[' + to_csv_string(filenames) + ']' return result - def CreateSkyImage(self, otdb_id, storage_properties): + def CreateSkyImage(self, otdb_id, storage_properties, project_name): SB_nr = 0 locations = [] filenames = [] @@ -121,13 +121,13 @@ class RAtoOTDBTranslator(): for sap in storage_properties["saps"]: ##We might need to sort saps? if "nr_of_img_files" in sap['properties']: for _ in range(sap['properties']['nr_of_img_files']): - locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = '[' + to_csv_string(filenames) + ']' return result - def CreatePulsarPipeline(self, otdb_id, storage_properties): + def CreatePulsarPipeline(self, otdb_id, storage_properties, project_name): SB_nr = 0 locations = [] filenames = [] @@ -135,28 +135,28 @@ class RAtoOTDBTranslator(): for sap in storage_properties["saps"]: ##We might need to sort saps? if "nr_of_uv_files" in sap['properties']: for _ in range(sap['properties']['nr_of_pulp_files']): - locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id)) filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = '[' + to_csv_string(filenames) + ']' return result - def CreateStorageKeys(self, otdb_id, storage_properties): + def CreateStorageKeys(self, otdb_id, storage_properties, project_name): logging.debug(otdb_id, storage_properties) result = {} if 'nr_of_uv_files' in storage_properties: - result.update(self.CreateCorrelated(otdb_id, storage_properties)) + result.update(self.CreateCorrelated(otdb_id, storage_properties, project_name)) if 'nr_of_cs_files' in storage_properties: - result.update(self.CreateCoherentStokes(otdb_id, storage_properties)) + result.update(self.CreateCoherentStokes(otdb_id, storage_properties, project_name)) if 'nr_of_is_files' in storage_properties: - result.update(self.CreateIncoherentStokes(otdb_id, storage_properties)) + result.update(self.CreateIncoherentStokes(otdb_id, storage_properties, project_name)) if 'nr_of_im_files' in storage_properties: - result.update(self.CreateInstrumentModel(otdb_id, storage_properties)) + result.update(self.CreateInstrumentModel(otdb_id, storage_properties, project_name)) if 'nr_of_img_files' in storage_properties: - result.update(self.CreateSkyImage(otdb_id, storage_properties)) + result.update(self.CreateSkyImage(otdb_id, storage_properties, project_name)) if 'nr_of_pulp_files' in storage_properties: - result.update(self.CreatePulsarPipeline(otdb_id, storage_properties)) + result.update(self.CreatePulsarPipeline(otdb_id, storage_properties, project_name)) return result def parseStorageProperties(self, storage_claim): @@ -172,7 +172,7 @@ class RAtoOTDBTranslator(): result[p['type_name']] = p['value'] return result - def CreateParset(self, otdb_id, ra_info): + def CreateParset(self, otdb_id, ra_info, project_name): logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime'])) parset = {} @@ -182,7 +182,7 @@ class RAtoOTDBTranslator(): if 'storage' in ra_info: logging.debug(ra_info['storage']) - parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']))) + parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']) project_name)) if 'stations' in ra_info: parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"] return parset -- GitLab