Skip to content
Snippets Groups Projects
Commit b47aa26f authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #8886: Added writing Output_DataProduct keys

parent 43c48521
No related branches found
No related tags found
No related merge requests found
...@@ -56,7 +56,7 @@ class OTDBRPC(RPCWrapper): ...@@ -56,7 +56,7 @@ class OTDBRPC(RPCWrapper):
def taskSetSpecification(self, otdb_id=None, specification={}): def taskSetSpecification(self, otdb_id=None, specification={}):
answer = self.rpc('TaskSetSpecification', OtdbID=otdb_id, Specification=specification) answer = self.rpc('TaskSetSpecification', OtdbID=otdb_id, Specification=specification)
if "Errors" in answer.keys(): if "Errors" in answer:
for key, problem in answer["Errors"].iteritems(): for key, problem in answer["Errors"].iteritems():
logger.warning("TaskSetSpecification for %i failed to set key %s because of %s" % (otdb_id, key, problem)) logger.warning("TaskSetSpecification for %i failed to set key %s because of %s" % (otdb_id, key, problem))
raise OTDBPRCException("TaskSetSpecification failed to set all keys for %i" % (otdb_id,)) raise OTDBPRCException("TaskSetSpecification failed to set all keys for %i" % (otdb_id,))
......
...@@ -103,8 +103,13 @@ class RAtoOTDBPropagator(): ...@@ -103,8 +103,13 @@ class RAtoOTDBPropagator():
logger.warning('doTaskScheduled no valid otdb_id: otdb_id=%s' % (otdb_id,)) logger.warning('doTaskScheduled no valid otdb_id: otdb_id=%s' % (otdb_id,))
return return
ra_info = self.getRAinfo(ra_id) ra_info = self.getRAinfo(ra_id)
otdb_info = self.translator.CreateParset(mom_id, ra_info) otdb_info = self.translator.CreateParset(otdb_id, ra_info)
self.setOTDBinfo(otdb_id, otdb_info, 'scheduled') self.setOTDBinfo(otdb_id, otdb_info, 'scheduled')
def parseStorageProperties(self, storage_properties):
result = {}
return result
def getRAinfo(self, ra_id): def getRAinfo(self, ra_id):
info = {} info = {}
...@@ -113,9 +118,8 @@ class RAtoOTDBPropagator(): ...@@ -113,9 +118,8 @@ class RAtoOTDBPropagator():
claims = self.radbrpc.getResourceClaims(task_id=ra_id, extended=True, include_properties=True) claims = self.radbrpc.getResourceClaims(task_id=ra_id, extended=True, include_properties=True)
for claim in claims: for claim in claims:
logger.debug(claim) logger.debug(claim)
for property in claim.properties: if resource_type_name == "storage":
if resource_type_name == "storage": info["storage"].update(parseStorageProperties(claim["properties"]))
info["storage"].update(resource)
info["starttime"] = task["starttime"] + datetime.timedelta(hours=1) #TODO Test code!!! FIXME FIXME before release info["starttime"] = task["starttime"] + datetime.timedelta(hours=1) #TODO Test code!!! FIXME FIXME before release
info["endtime"] = task["endtime"] + datetime.timedelta(hours=1) info["endtime"] = task["endtime"] + datetime.timedelta(hours=1)
info["status"] = task["status"] info["status"] = task["status"]
......
...@@ -26,13 +26,14 @@ reads the info from the RA DB and sends it to OTDB in the correct format. ...@@ -26,13 +26,14 @@ reads the info from the RA DB and sends it to OTDB in the correct format.
""" """
import logging import logging
from lofar.common.util import to_csv_string
#from lofar.parameterset import parameterset #from lofar.parameterset import parameterset
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
""" Prefix that is common to all parset keys, depending on the exact source. """ """ Prefix that is common to all parset keys, depending on the exact source. """
PREFIX="LOFAR.ObsSW." PREFIX="LOFAR.ObsSW.Observation."
##TODO use this. ##TODO use this.
class RAtoOTDBTranslator(): class RAtoOTDBTranslator():
...@@ -41,31 +42,113 @@ class RAtoOTDBTranslator(): ...@@ -41,31 +42,113 @@ class RAtoOTDBTranslator():
RAtoOTDBTranslator translates values from the RADB into parset keys to be stored in an OTDB Tree RAtoOTDBTranslator translates values from the RADB into parset keys to be stored in an OTDB Tree
""" """
def CreateStorageKeys(self, storage): def CreateCorrelated(self, otdb_id, storage_properties):
sb_nr = 0
locations = []
filesnames = []
result = {} result = {}
for property in storage: for sap in storage_properties["saps"]: ##We might need to sort saps?
if "uv" in x: if "nr_of_uv_files" in sap:
result[PREFIX + "Observation.DataProducts.Output_Correlated.filenames"] = CreateCorrelated(resource) for sap['nr_of_uv_files']:
if "cs" in x: locations.append("CEP4:/data/projects/test/L%d" % otdb_id)
result[PREFIX + "Observation.DataProducts.Output_Correlated.filenames"] = CreateCorrelated(resource) filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sb_nr, sap['sap_nr']))
if "is" in x: result[PREFIX + 'DataProducts.Output_Correlated.locations'] = to_csv_string(locations)
result[PREFIX + "Observation.DataProducts.Output_Correlated.filenames"] = CreateCorrelated(resource) result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = to_csv_string(filenames)
if "correlated_uv" in x:
result[PREFIX + "Observation.DataProducts.Output_Correlated.filenames"] = CreateCorrelated(resource) def CreateCoherentStokes(self, otdb_id, storage_properties):
SB_nr = 0
locations = []
def CreateParset(self, mom_id, ra_info): filesnames = []
result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_cs_files" in sap:
for sap['nr_of_cs_files']:
locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr']))
result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = to_csv_string(filenames)
def CreateIncoherentStokes(self, otdb_id, storage_properties):
SB_nr = 0
locations = []
filesnames = []
result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_is_files" in sap:
for sap['nr_of_is_files']:
locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr']))
result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = to_csv_string(filenames)
def CreateCreateInstrumentModel(self, otdb_id, storage_properties):
SB_nr = 0
locations = []
filesnames = []
result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_im_files" in sap:
for sap['nr_of_im_files']:
locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sb_nr, sap['sap_nr']))
result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = to_csv_string(filenames)
def CreateSkyImage(self, otdb_id, storage_properties):
SB_nr = 0
locations = []
filesnames = []
result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_img_files" in sap:
for sap['nr_of_img_files']:
locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sb_nr, sap['sap_nr']))
result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = to_csv_string(filenames)
def CreatePulsarPipeline(self, otdb_id, storage_properties):
SB_nr = 0
locations = []
filesnames = []
result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_uv_files" in sap:
for sap['nr_of_uv_files']:
locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr']))
result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = to_csv_string(filenames)
def CreateStorageKeys(self, otdb_id, storage_properties):
result = {}
for property in storage_properties:
if property = 'uv':
result.update(CreateCorrelated(otdb_id, storage_properties))
if property = 'cs':
result.update(CreateCoherentStokes(otdb_id, storage_properties))
if property = 'is':
result.update(CreateIncoherentStokes(otdb_id, storage_properties))
if property = 'im':
result.update(CreateInstrumentModel(otdb_id, storage_properties))
if property = 'img':
result.update(CreateSkyImage(otdb_id, storage_properties))
if property = 'pulp':
result.update(CreatePulsarPipeline(otdb_id, storage_properties))
def CreateParset(self, otdb_id, ra_info):
logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime'])) logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime']))
parset = {} parset = {}
#parset[PREFIX+'Observation.momID'] = str(mom_id) #parset[PREFIX+'momID'] = str(mom_id)
parset[PREFIX+'Observation.startTime'] = ra_info['starttime'].strftime('%Y-%m-%d %H:%M:%S') parset[PREFIX+'startTime'] = ra_info['starttime'].strftime('%Y-%m-%d %H:%M:%S')
parset[PREFIX+'Observation.stopTime'] = ra_info['endtime'].strftime('%Y-%m-%d %H:%M:%S') parset[PREFIX+'stopTime'] = ra_info['endtime'].strftime('%Y-%m-%d %H:%M:%S')
if "storage" in ra_info: if 'storage' in ra_info:
parset.update(CreateStorageKeys(ra_info["storage"])) parset.update(CreateStorageKeys(ra_info['storage']['properties']))
if "stations" in ra_info: if 'stations' in ra_info:
parset[PREFIX+'Observation.VirtualInstrument.stationList'] = ra_info["stations"] parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"]
return parset return parset
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment