Skip to content
Snippets Groups Projects
Commit fb77fdce authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #8886: Added project name in generated directory

parent 856f3558
No related branches found
No related tags found
No related merge requests found
...@@ -40,6 +40,10 @@ from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.otdbrpc im ...@@ -40,6 +40,10 @@ from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.otdbrpc im
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.translator import RAtoOTDBTranslator from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.translator import RAtoOTDBTranslator
from lofar.mom.momqueryservice.momqueryrpc import MoMRPC
from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME
from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -50,6 +54,8 @@ class RAtoOTDBPropagator(): ...@@ -50,6 +54,8 @@ class RAtoOTDBPropagator():
radb_broker=None, radb_broker=None,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME, otdb_servicename=DEFAULT_OTDB_SERVICENAME,
mom_busname=DEFAULT_MOM_BUSNAME,
mom_servicename=DEFAULT_MOM_SERVICENAME,
otdb_broker=None, otdb_broker=None,
broker=None): broker=None):
""" """
...@@ -68,6 +74,7 @@ class RAtoOTDBPropagator(): ...@@ -68,6 +74,7 @@ class RAtoOTDBPropagator():
self.radbrpc = RADBRPC(busname=radb_busname, servicename=radb_servicename, broker=radb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.radbrpc = RADBRPC(busname=radb_busname, servicename=radb_servicename, broker=radb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=otdb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=otdb_broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.momrpc = MoMRPC(busname=mom_busname, servicename=mom_servicename, broker=options.broker)
self.translator = RAtoOTDBTranslator() self.translator = RAtoOTDBTranslator()
def __enter__(self): def __enter__(self):
...@@ -83,11 +90,13 @@ class RAtoOTDBPropagator(): ...@@ -83,11 +90,13 @@ class RAtoOTDBPropagator():
"""Open rpc connections to radb service and resource estimator service""" """Open rpc connections to radb service and resource estimator service"""
self.radbrpc.open() self.radbrpc.open()
self.otdbrpc.open() self.otdbrpc.open()
self.momrpc.open()
def close(self): def close(self):
"""Close rpc connections to radb service and resource estimator service""" """Close rpc connections to radb service and resource estimator service"""
self.radbrpc.close() self.radbrpc.close()
self.otdbrpc.close() self.otdbrpc.close()
self.momrpc.close()
def doTaskConflict(self, ra_id, otdb_id, mom_id): def doTaskConflict(self, ra_id, otdb_id, mom_id):
logger.info('doTaskConflict: otdb_id=%s mom_id=%s' % (otdb_id, mom_id)) logger.info('doTaskConflict: otdb_id=%s mom_id=%s' % (otdb_id, mom_id))
...@@ -102,7 +111,9 @@ class RAtoOTDBPropagator(): ...@@ -102,7 +111,9 @@ class RAtoOTDBPropagator():
logger.warning('doTaskScheduled no valid otdb_id: otdb_id=%s' % (otdb_id,)) logger.warning('doTaskScheduled no valid otdb_id: otdb_id=%s' % (otdb_id,))
return return
ra_info = self.getRAinfo(ra_id) ra_info = self.getRAinfo(ra_id)
otdb_info = self.translator.CreateParset(otdb_id, ra_info) project = self.momrpc.getProjectDetails(mom_id)
project_name = "_".join(project['project_name'].split())
otdb_info = self.translator.CreateParset(otdb_id, ra_info, project_name)
logger.debug("Parset info for OTDB: %s" %otdb_info) logger.debug("Parset info for OTDB: %s" %otdb_info)
self.setOTDBinfo(otdb_id, otdb_info, 'scheduled') self.setOTDBinfo(otdb_id, otdb_info, 'scheduled')
......
...@@ -96,6 +96,9 @@ def main(): ...@@ -96,6 +96,9 @@ def main():
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME
from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME
# Check the invocation arguments # Check the invocation arguments
parser = OptionParser("%prog [options]", parser = OptionParser("%prog [options]",
...@@ -107,6 +110,8 @@ def main(): ...@@ -107,6 +110,8 @@ def main():
parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %default") parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %default")
parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default") parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default")
parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default") parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default")
parser.add_option("--mom_busname", dest="mom_busname", type="string", default=DEFAULT_MOM_BUSNAME, help="Name of the bus on which the MoM service listens, default: %default")
parser.add_option("--mom_servicename", dest="mom_servicename", type="string", default=DEFAULT_MOM_SERVICENAME, help="Name of the MoM service, default: %default")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
...@@ -118,6 +123,8 @@ def main(): ...@@ -118,6 +123,8 @@ def main():
radb_servicename=options.radb_servicename, radb_servicename=options.radb_servicename,
otdb_busname=options.otdb_busname, otdb_busname=options.otdb_busname,
otdb_servicename=options.otdb_servicename, otdb_servicename=options.otdb_servicename,
mom_busname=options.mom_busname,
mom_servicename=options.mom_servicename,
broker=options.broker) as propagator: broker=options.broker) as propagator:
with RATaskStatusChangedListener(busname=options.notification_busname, with RATaskStatusChangedListener(busname=options.notification_busname,
subject=options.notification_subject, subject=options.notification_subject,
......
...@@ -43,7 +43,7 @@ class RAtoOTDBTranslator(): ...@@ -43,7 +43,7 @@ class RAtoOTDBTranslator():
RAtoOTDBTranslator translates values from the RADB into parset keys to be stored in an OTDB Tree RAtoOTDBTranslator translates values from the RADB into parset keys to be stored in an OTDB Tree
""" """
def CreateCorrelated(self, otdb_id, storage_properties): def CreateCorrelated(self, otdb_id, storage_properties, project_name):
sb_nr = 0 sb_nr = 0
locations = [] locations = []
filenames = [] filenames = []
...@@ -52,14 +52,14 @@ class RAtoOTDBTranslator(): ...@@ -52,14 +52,14 @@ class RAtoOTDBTranslator():
logging.debug('processing sap: %s' % sap) logging.debug('processing sap: %s' % sap)
if "nr_of_uv_files" in sap['properties']: if "nr_of_uv_files" in sap['properties']:
for _ in xrange(sap['properties']['nr_of_uv_files']): for _ in xrange(sap['properties']['nr_of_uv_files']):
locations.append("CEP4:/data/projects/test/L%d" % otdb_id) locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id))
filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr)) filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr))
sb_nr += 1 sb_nr += 1
result[PREFIX + 'DataProducts.Output_Correlated.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_Correlated.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
def CreateCoherentStokes(self, otdb_id, storage_properties): def CreateCoherentStokes(self, otdb_id, storage_properties, project_name):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filenames = [] filenames = []
...@@ -73,13 +73,13 @@ class RAtoOTDBTranslator(): ...@@ -73,13 +73,13 @@ class RAtoOTDBTranslator():
for tab in xrange(nr_tabs): for tab in xrange(nr_tabs):
for stokes in xrange(nr_stokes): for stokes in xrange(nr_stokes):
for part in xrange(nr_parts): for part in xrange(nr_parts):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id))
filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part))
result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
def CreateIncoherentStokes(self, otdb_id, storage_properties): def CreateIncoherentStokes(self, otdb_id, storage_properties, project_name):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filenames = [] filenames = []
...@@ -93,13 +93,13 @@ class RAtoOTDBTranslator(): ...@@ -93,13 +93,13 @@ class RAtoOTDBTranslator():
for tab in xrange(nr_tabs): for tab in xrange(nr_tabs):
for stokes in xrange(nr_stokes): for stokes in xrange(nr_stokes):
for part in xrange(nr_parts): for part in xrange(nr_parts):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id))
filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part))
result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
def CreateCreateInstrumentModel(self, otdb_id, storage_properties): def CreateCreateInstrumentModel(self, otdb_id, storage_properties, project_name):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filenames = [] filenames = []
...@@ -107,13 +107,13 @@ class RAtoOTDBTranslator(): ...@@ -107,13 +107,13 @@ class RAtoOTDBTranslator():
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_im_files" in sap['properties']: if "nr_of_im_files" in sap['properties']:
for _ in range(sap['properties']['nr_of_im_files']): for _ in range(sap['properties']['nr_of_im_files']):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id))
filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr)) filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr))
result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
def CreateSkyImage(self, otdb_id, storage_properties): def CreateSkyImage(self, otdb_id, storage_properties, project_name):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filenames = [] filenames = []
...@@ -121,13 +121,13 @@ class RAtoOTDBTranslator(): ...@@ -121,13 +121,13 @@ class RAtoOTDBTranslator():
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_img_files" in sap['properties']: if "nr_of_img_files" in sap['properties']:
for _ in range(sap['properties']['nr_of_img_files']): for _ in range(sap['properties']['nr_of_img_files']):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id))
filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr)) filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr))
result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
def CreatePulsarPipeline(self, otdb_id, storage_properties): def CreatePulsarPipeline(self, otdb_id, storage_properties, project_name):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filenames = [] filenames = []
...@@ -135,28 +135,28 @@ class RAtoOTDBTranslator(): ...@@ -135,28 +135,28 @@ class RAtoOTDBTranslator():
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_uv_files" in sap['properties']: if "nr_of_uv_files" in sap['properties']:
for _ in range(sap['properties']['nr_of_pulp_files']): for _ in range(sap['properties']['nr_of_pulp_files']):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) locations.append("CEP4:/data/projects/%s/L%d" % (project_name, otdb_id))
filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sap['sap_nr'], sb_nr)) filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sap['sap_nr'], sb_nr))
result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = '[' + to_csv_string(locations) + ']'
result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = '[' + to_csv_string(filenames) + ']'
return result return result
def CreateStorageKeys(self, otdb_id, storage_properties): def CreateStorageKeys(self, otdb_id, storage_properties, project_name):
logging.debug(otdb_id, storage_properties) logging.debug(otdb_id, storage_properties)
result = {} result = {}
if 'nr_of_uv_files' in storage_properties: if 'nr_of_uv_files' in storage_properties:
result.update(self.CreateCorrelated(otdb_id, storage_properties)) result.update(self.CreateCorrelated(otdb_id, storage_properties, project_name))
if 'nr_of_cs_files' in storage_properties: if 'nr_of_cs_files' in storage_properties:
result.update(self.CreateCoherentStokes(otdb_id, storage_properties)) result.update(self.CreateCoherentStokes(otdb_id, storage_properties, project_name))
if 'nr_of_is_files' in storage_properties: if 'nr_of_is_files' in storage_properties:
result.update(self.CreateIncoherentStokes(otdb_id, storage_properties)) result.update(self.CreateIncoherentStokes(otdb_id, storage_properties, project_name))
if 'nr_of_im_files' in storage_properties: if 'nr_of_im_files' in storage_properties:
result.update(self.CreateInstrumentModel(otdb_id, storage_properties)) result.update(self.CreateInstrumentModel(otdb_id, storage_properties, project_name))
if 'nr_of_img_files' in storage_properties: if 'nr_of_img_files' in storage_properties:
result.update(self.CreateSkyImage(otdb_id, storage_properties)) result.update(self.CreateSkyImage(otdb_id, storage_properties, project_name))
if 'nr_of_pulp_files' in storage_properties: if 'nr_of_pulp_files' in storage_properties:
result.update(self.CreatePulsarPipeline(otdb_id, storage_properties)) result.update(self.CreatePulsarPipeline(otdb_id, storage_properties, project_name))
return result return result
def parseStorageProperties(self, storage_claim): def parseStorageProperties(self, storage_claim):
...@@ -172,7 +172,7 @@ class RAtoOTDBTranslator(): ...@@ -172,7 +172,7 @@ class RAtoOTDBTranslator():
result[p['type_name']] = p['value'] result[p['type_name']] = p['value']
return result return result
def CreateParset(self, otdb_id, ra_info): def CreateParset(self, otdb_id, ra_info, project_name):
logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime'])) logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime']))
parset = {} parset = {}
...@@ -182,7 +182,7 @@ class RAtoOTDBTranslator(): ...@@ -182,7 +182,7 @@ class RAtoOTDBTranslator():
if 'storage' in ra_info: if 'storage' in ra_info:
logging.debug(ra_info['storage']) logging.debug(ra_info['storage'])
parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']))) parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']) project_name))
if 'stations' in ra_info: if 'stations' in ra_info:
parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"] parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"]
return parset return parset
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment