Skip to content
Snippets Groups Projects
Commit 2962822b authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #8886: Added functions to generate correct filenames and locations

parent 9b9b30c7
No related branches found
No related tags found
No related merge requests found
...@@ -104,31 +104,18 @@ class RAtoOTDBPropagator(): ...@@ -104,31 +104,18 @@ class RAtoOTDBPropagator():
return return
ra_info = self.getRAinfo(ra_id) ra_info = self.getRAinfo(ra_id)
otdb_info = self.translator.CreateParset(otdb_id, ra_info) otdb_info = self.translator.CreateParset(otdb_id, ra_info)
logger.debug("Parset info for OTDB: %s" %otdb_info)
self.setOTDBinfo(otdb_id, otdb_info, 'scheduled') self.setOTDBinfo(otdb_id, otdb_info, 'scheduled')
def parseStorageProperties(self, storage_properties):
result = {}
for p in storage_properties:
if p['type_name'] == 'nr_of_uv_files':
if :
result['uv'] = {'nr_of_uv_files': p['value'], 'uv_file_size': storage_properties['uv_file_size']}
if p['type_name'] == 'nr_of_cs_files' in storage_properties:
result['cs'] = {'nr_of_cs_files': storage_properties['nr_of_cs_files'], 'cs_file_size': storage_properties['cs_file_size'], 'nr_of_cs_stokes': storage_properties['nr_of_cs_stokes']}
if p['type_name'] == 'nr_of_is_files':
result['is'] = {'nr_of_is_files': storage_properties['nr_of_is_files'], 'is_file_size': storage_properties['is_file_size'], 'nr_of_is_stokes': storage_properties['nr_of_is_stokes']}
if p['type_name'] == 'nr_of_is_files':
if p['type_name'] == 'nr_of_is_files':
return result
def getRAinfo(self, ra_id): def getRAinfo(self, ra_id):
info = {} info = {}
info["storage"] = {} info["storage"] = {}
task = self.radbrpc.getTask(ra_id) task = self.radbrpc.getTask(ra_id)
claims = self.radbrpc.getResourceClaims(task_id=ra_id, extended=True, include_properties=True) claims = self.radbrpc.getResourceClaims(task_id=ra_id, extended=True, include_properties=True)
for claim in claims: for claim in claims:
logger.debug(claim) logger.debug("Processing claim: %s" % claim)
if resource_type_name == "storage": if claim['resource_type_name'] == 'storage':
info["storage"].update(parseStorageProperties(claim["properties"])) info['storage'] = claim
info["starttime"] = task["starttime"] + datetime.timedelta(hours=1) #TODO Test code!!! FIXME FIXME before release info["starttime"] = task["starttime"] + datetime.timedelta(hours=1) #TODO Test code!!! FIXME FIXME before release
info["endtime"] = task["endtime"] + datetime.timedelta(hours=1) info["endtime"] = task["endtime"] + datetime.timedelta(hours=1)
info["status"] = task["status"] info["status"] = task["status"]
...@@ -136,5 +123,5 @@ class RAtoOTDBPropagator(): ...@@ -136,5 +123,5 @@ class RAtoOTDBPropagator():
def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): def setOTDBinfo(self, otdb_id, otdb_info, otdb_status):
self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) self.otdbrpc.taskSetSpecification(otdb_id, otdb_info)
#self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], otdb_info["LOFAR.ObsSW.Observation.stopTime"]) self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], otdb_info["LOFAR.ObsSW.Observation.stopTime"])
self.otdbrpc.taskSetStatus(otdb_id, otdb_status) self.otdbrpc.taskSetStatus(otdb_id, otdb_status)
...@@ -27,6 +27,7 @@ reads the info from the RA DB and sends it to OTDB in the correct format. ...@@ -27,6 +27,7 @@ reads the info from the RA DB and sends it to OTDB in the correct format.
import logging import logging
from lofar.common.util import to_csv_string from lofar.common.util import to_csv_string
from math import ceil, floor
#from lofar.parameterset import parameterset #from lofar.parameterset import parameterset
...@@ -44,98 +45,131 @@ class RAtoOTDBTranslator(): ...@@ -44,98 +45,131 @@ class RAtoOTDBTranslator():
def CreateCorrelated(self, otdb_id, storage_properties): def CreateCorrelated(self, otdb_id, storage_properties):
sb_nr = 0 sb_nr = 0
locations = [] locations = []
filesnames = [] filenames = []
result = {} result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_uv_files" in sap: logging.debug('processing sap: %s' % sap)
for sap['nr_of_uv_files']: if "nr_of_uv_files" in sap['properties']:
for _ in xrange(sap['properties']['nr_of_uv_files']):
locations.append("CEP4:/data/projects/test/L%d" % otdb_id) locations.append("CEP4:/data/projects/test/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sb_nr, sap['sap_nr'])) filenames.append("L%d_SAP%03d_SB%03d_uv.MS" % (otdb_id, sap['sap_nr'], sb_nr))
sb_nr += 1
result[PREFIX + 'DataProducts.Output_Correlated.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_Correlated.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = to_csv_string(filenames) result[PREFIX + 'DataProducts.Output_Correlated.filenames'] = to_csv_string(filenames)
return result
def CreateCoherentStokes(self, otdb_id, storage_properties): def CreateCoherentStokes(self, otdb_id, storage_properties):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filesnames = [] filenames = []
result = {} result = {}
nr_stokes = storage_properties['nr_of_cs_stokes']
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_cs_files" in sap: if "nr_of_cs_files" in sap['properties']:
for sap['nr_of_cs_files']: nr_files = sap['properties']['nr_of_cs_files']
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) nr_tabs = sap['properties']['nr_of_tabs']
filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) nr_parts = int(ceil(nr_files/float(nr_tabs * nr_stokes)))
for tab in xrange(nr_tabs):
for stokes in xrange(nr_stokes):
for part in xrange(nr_parts):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part))
result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_CoherentStokes.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = to_csv_string(filenames) result[PREFIX + 'DataProducts.Output_CoherentStokes.filenames'] = to_csv_string(filenames)
return result
def CreateIncoherentStokes(self, otdb_id, storage_properties): def CreateIncoherentStokes(self, otdb_id, storage_properties):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filesnames = [] filenames = []
result = {} result = {}
nr_stokes = storage_properties['nr_of_is_stokes']
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_is_files" in sap: if "nr_of_is_files" in sap['properties']:
for sap['nr_of_is_files']: nr_files = sap['properties']['nr_of_is_files']
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) nr_tabs = sap['properties']['nr_of_tabs']
filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) nr_parts = int(ceil(nr_files/float(nr_tabs * nr_stokes)))
for tab in xrange(nr_tabs):
for stokes in xrange(nr_stokes):
for part in xrange(nr_parts):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part))
result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_IncoherentStokes.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = to_csv_string(filenames) result[PREFIX + 'DataProducts.Output_IncoherentStokes.filenames'] = to_csv_string(filenames)
return result
def CreateCreateInstrumentModel(self, otdb_id, storage_properties): def CreateCreateInstrumentModel(self, otdb_id, storage_properties):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filesnames = [] filenames = []
result = {} result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_im_files" in sap: if "nr_of_im_files" in sap['properties']:
for sap['nr_of_im_files']: for _ in range(sap['properties']['nr_of_im_files']):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sb_nr, sap['sap_nr'])) filenames.append("L%d_SAP%03d_SB%03d_inst.INST" % (otdb_id, sap['sap_nr'], sb_nr))
result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_InstrumentModel.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = to_csv_string(filenames) result[PREFIX + 'DataProducts.Output_InstrumentModel.filenames'] = to_csv_string(filenames)
return result
def CreateSkyImage(self, otdb_id, storage_properties): def CreateSkyImage(self, otdb_id, storage_properties):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filesnames = [] filenames = []
result = {} result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_img_files" in sap: if "nr_of_img_files" in sap['properties']:
for sap['nr_of_img_files']: for _ in range(sap['properties']['nr_of_img_files']):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sb_nr, sap['sap_nr'])) filenames.append("L%d_SAP%03d_SB%03d_sky.IM" % (otdb_id, sap['sap_nr'], sb_nr))
result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_SkyImage.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = to_csv_string(filenames) result[PREFIX + 'DataProducts.Output_SkyImage.filenames'] = to_csv_string(filenames)
return result
def CreatePulsarPipeline(self, otdb_id, storage_properties): def CreatePulsarPipeline(self, otdb_id, storage_properties):
SB_nr = 0 SB_nr = 0
locations = [] locations = []
filesnames = [] filenames = []
result = {} result = {}
for sap in storage_properties["saps"]: ##We might need to sort saps? for sap in storage_properties["saps"]: ##We might need to sort saps?
if "nr_of_uv_files" in sap: if "nr_of_uv_files" in sap['properties']:
for sap['nr_of_uv_files']: for _ in range(sap['properties']['nr_of_pulp_files']):
locations.append("CEP4:/data/projects/project/L%d" % otdb_id) locations.append("CEP4:/data/projects/project/L%d" % otdb_id)
filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sb_nr, sap['sap_nr'])) filenames.append("L%d_SAP%03d_SB%03d_bf.h5" % (otdb_id, sap['sap_nr'], sb_nr))
result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = to_csv_string(locations) result[PREFIX + 'DataProducts.Output_Pulsar.locations'] = to_csv_string(locations)
result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = to_csv_string(filenames) result[PREFIX + 'DataProducts.Output_Pulsar.filenames'] = to_csv_string(filenames)
return result
def CreateStorageKeys(self, otdb_id, storage_properties): def CreateStorageKeys(self, otdb_id, storage_properties):
logging.debug(otdb_id, storage_properties)
result = {}
if 'nr_of_uv_files' in storage_properties:
result.update(self.CreateCorrelated(otdb_id, storage_properties))
if 'nr_of_cs_files' in storage_properties:
result.update(self.CreateCoherentStokes(otdb_id, storage_properties))
if 'nr_of_is_files' in storage_properties:
result.update(self.CreateIncoherentStokes(otdb_id, storage_properties))
if 'nr_of_im_files' in storage_properties:
result.update(self.CreateInstrumentModel(otdb_id, storage_properties))
if 'nr_of_img_files' in storage_properties:
result.update(self.CreateSkyImage(otdb_id, storage_properties))
if 'nr_of_pulp_files' in storage_properties:
result.update(self.CreatePulsarPipeline(otdb_id, storage_properties))
return result
def parseStorageProperties(self, storage_claim):
result = {} result = {}
for property in storage_properties: result['saps'] = []
if property = 'uv': for s in storage_claim['saps']:
result.update(CreateCorrelated(otdb_id, storage_properties)) properties = {}
if property = 'cs': for p in s['properties']:
result.update(CreateCoherentStokes(otdb_id, storage_properties)) properties[p['type_name']] = p['value']
if property = 'is': result['saps'].append({'sap_nr' : s['sap_nr'], 'properties': properties})
result.update(CreateIncoherentStokes(otdb_id, storage_properties)) for p in storage_claim['properties']:
if property = 'im': result[p['type_name']] = p['value']
result.update(CreateInstrumentModel(otdb_id, storage_properties)) return result
if property = 'img':
result.update(CreateSkyImage(otdb_id, storage_properties))
if property = 'pulp':
result.update(CreatePulsarPipeline(otdb_id, storage_properties))
def CreateParset(self, otdb_id, ra_info): def CreateParset(self, otdb_id, ra_info):
logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime'])) logger.info('CreateParset: start=%s, end=%s' % (ra_info['starttime'], ra_info['endtime']))
...@@ -146,7 +180,8 @@ class RAtoOTDBTranslator(): ...@@ -146,7 +180,8 @@ class RAtoOTDBTranslator():
parset[PREFIX+'stopTime'] = ra_info['endtime'].strftime('%Y-%m-%d %H:%M:%S') parset[PREFIX+'stopTime'] = ra_info['endtime'].strftime('%Y-%m-%d %H:%M:%S')
if 'storage' in ra_info: if 'storage' in ra_info:
parset.update(CreateStorageKeys(ra_info['storage']['properties'])) logging.debug(ra_info['storage'])
parset.update(self.CreateStorageKeys(otdb_id, self.parseStorageProperties(ra_info['storage'])))
if 'stations' in ra_info: if 'stations' in ra_info:
parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"] parset[PREFIX+'VirtualInstrument.stationList'] = ra_info["stations"]
return parset return parset
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment