diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/otdbrpc.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/otdbrpc.py index 5769deeb8a4968234c517a4cb94c0599eeca16b5..4c9cbb2903cd2b24afb8a42114d9e8426158ca22 100644 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/otdbrpc.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/otdbrpc.py @@ -56,7 +56,7 @@ class OTDBRPC(RPCWrapper): def taskSetSpecification(self, otdb_id=None, specification={}): answer = self.rpc('TaskSetSpecification', OtdbID=otdb_id, Specification=specification) - if "Errors" in answer.keys(): + if "Errors" in answer: for key, problem in answer["Errors"].iteritems(): logger.warning("TaskSetSpecification for %i failed to set key %s because of %s" % (otdb_id, key, problem)) raise OTDBPRCException("TaskSetSpecification failed to set all keys for %i" % (otdb_id,)) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index 41cffb11797c1a0b5ce8960d44be4cc967acc104..2ad9904f130b030fa5fa7a010c8fc7fae91da82f 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -103,8 +103,13 @@ class RAtoOTDBPropagator(): logger.warning('doTaskScheduled no valid otdb_id: otdb_id=%s' % (otdb_id,)) return ra_info = self.getRAinfo(ra_id) - otdb_info = self.translator.CreateParset(mom_id, ra_info) + otdb_info = self.translator.CreateParset(otdb_id, ra_info) self.setOTDBinfo(otdb_id, otdb_info, 'scheduled') + + def parseStorageProperties(self, storage_properties): + result = {} + + return result def getRAinfo(self, ra_id): info = {} @@ -113,9 +118,8 @@ class RAtoOTDBPropagator(): claims = self.radbrpc.getResourceClaims(task_id=ra_id, extended=True, include_properties=True) for claim in claims: logger.debug(claim) - for property in claim.properties: - if resource_type_name == "storage": - info["storage"].update(resource) + if resource_type_name == "storage": + info["storage"].update(parseStorageProperties(claim["properties"])) info["starttime"] = task["starttime"] + datetime.timedelta(hours=1) #TODO Test code!!! FIXME FIXME before release info["endtime"] = task["endtime"] + datetime.timedelta(hours=1) info["status"] = task["status"] diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index 85068ebcecc32181df89289766796be20db27438..cd962e165df19851415295da61a313eb577107f7 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -26,13 +26,14 @@ reads the info from the RA DB and sends it to OTDB in the correct format. """ import logging +from lofar.common.util import to_csv_string #from lofar.parameterset import parameterset logger = logging.getLogger(__name__) """ Prefix that is common to all parset keys, depending on the exact source. """ -PREFIX="LOFAR.ObsSW." +PREFIX="LOFAR.ObsSW.Observation." ##TODO use this. class RAtoOTDBTranslator(): @@ -41,31 +42,113 @@ class RAtoOTDBTranslator(): RAtoOTDBTranslator translates values from the RADB into parset keys to be stored in an OTDB Tree """ - def CreateStorageKeys(self, storage): + def CreateCorrelated(self, otdb_id, storage_properties): + sb_nr = 0 + locations = [] + filesnames = [] result = {} - for property in storage: - if "uv" in x: - result[PREFIX + "Observation.DataProducts.Output_Correlated.filenames"] = CreateCorrelated(resource) - if "cs" in x: - result[PREFIX + "Observation.DataProducts.Output_Correlated.filenames"] = CreateCorrelated(resource) - if "is" in x: - result[PREFIX + "Observation.DataProducts.Output_Correlated.filenames"] = CreateCorrelated(resource) - if "correlated_uv" in x: - result[PREFIX + "Observation.DataProducts.Output_Correlated.filenames"] = CreateCorrelated(resource) - - - def CreateParset(self, mom_id, ra_info): + for sap in storage_properties["saps"]: ##We might need to sort saps? + if "nr_of_uv_files" in sap: + for sap['nr_of_uv_files']: + locations.append("CEP4:/data/projects/test/L%d" % otdb_id) + filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sb_nr, sap['sap_nr'])) + result[PREFIX + 'DataProducts.Output_Correlated.locations'] = to_csv_string(locations) + result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = to_csv_string(filenames) + + def CreateCoherentStokes(self, otdb_id, storage_properties): + SB_nr = 0 + locations = [] + filesnames = [] + result = {} + for sap in storage_properties["saps"]: ##We might need to sort saps? + if "nr_of_cs_files" in sap: + for sap['nr_of_cs_files']: + locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) + result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = to_csv_string(locations) + result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = to_csv_string(filenames) + + def CreateIncoherentStokes(self, otdb_id, storage_properties): + SB_nr = 0 + locations = [] + filesnames = [] + result = {} + for sap in storage_properties["saps"]: ##We might need to sort saps? + if "nr_of_is_files" in sap: + for sap['nr_of_is_files']: + locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) + result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = to_csv_string(locations) + result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = to_csv_string(filenames) + + def CreateCreateInstrumentModel(self, otdb_id, storage_properties): + SB_nr = 0 + locations = [] + filesnames = [] + result = {} + for sap in storage_properties["saps"]: ##We might need to sort saps? + if "nr_of_im_files" in sap: + for sap['nr_of_im_files']: + locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sb_nr, sap['sap_nr'])) + result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = to_csv_string(locations) + result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = to_csv_string(filenames) + + def CreateSkyImage(self, otdb_id, storage_properties): + SB_nr = 0 + locations = [] + filesnames = [] + result = {} + for sap in storage_properties["saps"]: ##We might need to sort saps? + if "nr_of_img_files" in sap: + for sap['nr_of_img_files']: + locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sb_nr, sap['sap_nr'])) + result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = to_csv_string(locations) + result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = to_csv_string(filenames) + + def CreatePulsarPipeline(self, otdb_id, storage_properties): + SB_nr = 0 + locations = [] + filesnames = [] + result = {} + for sap in storage_properties["saps"]: ##We might need to sort saps? + if "nr_of_uv_files" in sap: + for sap['nr_of_uv_files']: + locations.append("CEP4:/data/projects/project/L%d" % otdb_id) + filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) + result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = to_csv_string(locations) + result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = to_csv_string(filenames) + + + def CreateStorageKeys(self, otdb_id, storage_properties): + result = {} + for property in storage_properties: + if property = 'uv': + result.update(CreateCorrelated(otdb_id, storage_properties)) + if property = 'cs': + result.update(CreateCoherentStokes(otdb_id, storage_properties)) + if property = 'is': + result.update(CreateIncoherentStokes(otdb_id, storage_properties)) + if property = 'im': + result.update(CreateInstrumentModel(otdb_id, storage_properties)) + if property = 'img': + result.update(CreateSkyImage(otdb_id, storage_properties)) + if property = 'pulp': + result.update(CreatePulsarPipeline(otdb_id, storage_properties)) + + def CreateParset(self, otdb_id, ra_info): logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime'])) parset = {} - #parset[PREFIX+'Observation.momID'] = str(mom_id) - parset[PREFIX+'Observation.startTime'] = ra_info['starttime'].strftime('%Y-%m-%d %H:%M:%S') - parset[PREFIX+'Observation.stopTime'] = ra_info['endtime'].strftime('%Y-%m-%d %H:%M:%S') - - if "storage" in ra_info: - parset.update(CreateStorageKeys(ra_info["storage"])) - if "stations" in ra_info: - parset[PREFIX+'Observation.VirtualInstrument.stationList'] = ra_info["stations"] + #parset[PREFIX+'momID'] = str(mom_id) + parset[PREFIX+'startTime'] = ra_info['starttime'].strftime('%Y-%m-%d %H:%M:%S') + parset[PREFIX+'stopTime'] = ra_info['endtime'].strftime('%Y-%m-%d %H:%M:%S') + + if 'storage' in ra_info: + parset.update(CreateStorageKeys(ra_info['storage']['properties'])) + if 'stations' in ra_info: + parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"] return parset