From 2962822b53e6b0037ca7958789b07a6ac971b73c Mon Sep 17 00:00:00 2001 From: Adriaan Renting <renting@astron.nl> Date: Sat, 26 Mar 2016 11:13:59 +0000 Subject: [PATCH] Task #8886: Added functions to generate correct filenames and locations --- .../lib/propagator.py | 23 +--- .../lib/translator.py | 127 +++++++++++------- 2 files changed, 86 insertions(+), 64 deletions(-) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index 9758b4a45f2..1699ed9a869 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -104,31 +104,18 @@ class RAtoOTDBPropagator(): return ra_info = self.getRAinfo(ra_id) otdb_info = self.translator.CreateParset(otdb_id, ra_info) + logger.debug("Parset info for OTDB: %s" %otdb_info) self.setOTDBinfo(otdb_id, otdb_info, 'scheduled') - def parseStorageProperties(self, storage_properties): - result = {} - for p in storage_properties: - if p['type_name'] == 'nr_of_uv_files': - if : - result['uv'] = {'nr_of_uv_files': p['value'], 'uv_file_size': storage_properties['uv_file_size']} - if p['type_name'] == 'nr_of_cs_files' in storage_properties: - result['cs'] = {'nr_of_cs_files': storage_properties['nr_of_cs_files'], 'cs_file_size': storage_properties['cs_file_size'], 'nr_of_cs_stokes': storage_properties['nr_of_cs_stokes']} - if p['type_name'] == 'nr_of_is_files': - result['is'] = {'nr_of_is_files': storage_properties['nr_of_is_files'], 'is_file_size': storage_properties['is_file_size'], 'nr_of_is_stokes': storage_properties['nr_of_is_stokes']} - if p['type_name'] == 'nr_of_is_files': - if p['type_name'] == 'nr_of_is_files': - return result - def getRAinfo(self, ra_id): info = {} info["storage"] = {} task = self.radbrpc.getTask(ra_id) claims = self.radbrpc.getResourceClaims(task_id=ra_id, extended=True, include_properties=True) for claim in claims: - logger.debug(claim) - if resource_type_name == "storage": - info["storage"].update(parseStorageProperties(claim["properties"])) + logger.debug("Processing claim: %s" % claim) + if claim['resource_type_name'] == 'storage': + info['storage'] = claim info["starttime"] = task["starttime"] + datetime.timedelta(hours=1) #TODO Test code!!! FIXME FIXME before release info["endtime"] = task["endtime"] + datetime.timedelta(hours=1) info["status"] = task["status"] @@ -136,5 +123,5 @@ class RAtoOTDBPropagator(): def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) - #self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], otdb_info["LOFAR.ObsSW.Observation.stopTime"]) + self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], otdb_info["LOFAR.ObsSW.Observation.stopTime"]) self.otdbrpc.taskSetStatus(otdb_id, otdb_status) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index cd962e165df..f8d36eb1f68 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -27,6 +27,7 @@ reads the info from the RA DB and sends it to OTDB in the correct format. import logging from lofar.common.util import to_csv_string +from math import ceil, floor #from lofar.parameterset import parameterset @@ -44,98 +45,131 @@ class RAtoOTDBTranslator(): def CreateCorrelated(self, otdb_id, storage_properties): sb_nr = 0 - locations = [] - filesnames = [] + locations = [] + filenames = [] result = {} for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_uv_files" in sap: - for sap['nr_of_uv_files']: + logging.debug('processing sap: %s' % sap) + if "nr_of_uv_files" in sap['properties']: + for _ in xrange(sap['properties']['nr_of_uv_files']): locations.append("CEP4:/data/projects/test/L%d" % otdb_id) - filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sb_nr, sap['sap_nr'])) + filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr)) + sb_nr += 1 result[PREFIX + 'DataProducts.Output_Correlated.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = to_csv_string(filenames) + return result def CreateCoherentStokes(self, otdb_id, storage_properties): SB_nr = 0 - locations = [] - filesnames = [] + locations = [] + filenames = [] result = {} + nr_stokes = storage_properties['nr_of_cs_stokes'] for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_cs_files" in sap: - for sap['nr_of_cs_files']: - locations.append("CEP4:/data/projects/project/L%d" % otdb_id) - filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) + if "nr_of_cs_files" in sap['properties']: + nr_files = sap['properties']['nr_of_cs_files'] + nr_tabs = sap['properties']['nr_of_tabs'] + nr_parts = int(ceil(nr_files/float(nr_tabs * nr_stokes))) + for tab in xrange(nr_tabs): + for stokes in xrange(nr_stokes): + for part in xrange(nr_parts): + locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = to_csv_string(filenames) + return result def CreateIncoherentStokes(self, otdb_id, storage_properties): SB_nr = 0 - locations = [] - filesnames = [] + locations = [] + filenames = [] result = {} + nr_stokes = storage_properties['nr_of_is_stokes'] for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_is_files" in sap: - for sap['nr_of_is_files']: - locations.append("CEP4:/data/projects/project/L%d" % otdb_id) - filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) + if "nr_of_is_files" in sap['properties']: + nr_files = sap['properties']['nr_of_is_files'] + nr_tabs = sap['properties']['nr_of_tabs'] + nr_parts = int(ceil(nr_files/float(nr_tabs * nr_stokes))) + for tab in xrange(nr_tabs): + for stokes in xrange(nr_stokes): + for part in xrange(nr_parts): + locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = to_csv_string(filenames) + return result def CreateCreateInstrumentModel(self, otdb_id, storage_properties): SB_nr = 0 - locations = [] - filesnames = [] + locations = [] + filenames = [] result = {} for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_im_files" in sap: - for sap['nr_of_im_files']: + if "nr_of_im_files" in sap['properties']: + for _ in range(sap['properties']['nr_of_im_files']): locations.append("CEP4:/data/projects/project/L%d" % otdb_id) - filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sb_nr, sap['sap_nr'])) + filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = to_csv_string(filenames) + return result def CreateSkyImage(self, otdb_id, storage_properties): SB_nr = 0 - locations = [] - filesnames = [] + locations = [] + filenames = [] result = {} for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_img_files" in sap: - for sap['nr_of_img_files']: + if "nr_of_img_files" in sap['properties']: + for _ in range(sap['properties']['nr_of_img_files']): locations.append("CEP4:/data/projects/project/L%d" % otdb_id) - filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sb_nr, sap['sap_nr'])) + filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = to_csv_string(filenames) + return result def CreatePulsarPipeline(self, otdb_id, storage_properties): SB_nr = 0 - locations = [] - filesnames = [] + locations = [] + filenames = [] result = {} for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_uv_files" in sap: - for sap['nr_of_uv_files']: + if "nr_of_uv_files" in sap['properties']: + for _ in range(sap['properties']['nr_of_pulp_files']): locations.append("CEP4:/data/projects/project/L%d" % otdb_id) - filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) + filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sap['sap_nr'], sb_nr)) result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = to_csv_string(filenames) + return result def CreateStorageKeys(self, otdb_id, storage_properties): + logging.debug(otdb_id, storage_properties) + result = {} + if 'nr_of_uv_files' in storage_properties: + result.update(self.CreateCorrelated(otdb_id, storage_properties)) + if 'nr_of_cs_files' in storage_properties: + result.update(self.CreateCoherentStokes(otdb_id, storage_properties)) + if 'nr_of_is_files' in storage_properties: + result.update(self.CreateIncoherentStokes(otdb_id, storage_properties)) + if 'nr_of_im_files' in storage_properties: + result.update(self.CreateInstrumentModel(otdb_id, storage_properties)) + if 'nr_of_img_files' in storage_properties: + result.update(self.CreateSkyImage(otdb_id, storage_properties)) + if 'nr_of_pulp_files' in storage_properties: + result.update(self.CreatePulsarPipeline(otdb_id, storage_properties)) + return result + + def parseStorageProperties(self, storage_claim): result = {} - for property in storage_properties: - if property = 'uv': - result.update(CreateCorrelated(otdb_id, storage_properties)) - if property = 'cs': - result.update(CreateCoherentStokes(otdb_id, storage_properties)) - if property = 'is': - result.update(CreateIncoherentStokes(otdb_id, storage_properties)) - if property = 'im': - result.update(CreateInstrumentModel(otdb_id, storage_properties)) - if property = 'img': - result.update(CreateSkyImage(otdb_id, storage_properties)) - if property = 'pulp': - result.update(CreatePulsarPipeline(otdb_id, storage_properties)) + result['saps'] = [] + for s in storage_claim['saps']: + properties = {} + for p in s['properties']: + properties[p['type_name']] = p['value'] + result['saps'].append({'sap_nr' : s['sap_nr'], 'properties': properties}) + for p in storage_claim['properties']: + result[p['type_name']] = p['value'] + return result def CreateParset(self, otdb_id, ra_info): logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime'])) @@ -146,7 +180,8 @@ class RAtoOTDBTranslator(): parset[PREFIX+'stopTime'] = ra_info['endtime'].strftime('%Y-%m-%d %H:%M:%S') if 'storage' in ra_info: - parset.update(CreateStorageKeys(ra_info['storage']['properties'])) + logging.debug(ra_info['storage']) + parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage']))) if 'stations' in ra_info: parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"] return parset -- GitLab