diff --git a/lib/feedback.py b/lib/feedback.py index b33d8cf743cb6bfd6349d7856de3d937cf97a601..1472f8a6c5ab978758e89b178ce2e7737e762fd4 100644 --- a/lib/feedback.py +++ b/lib/feedback.py @@ -1,11 +1,270 @@ +#!/usr/bin/python -from lofar.lta.sip import siplib as sip +import sys +import pprint +import siplib +import constants +from ast import literal_eval +import datetime +import copy #fill in lots of methods/classes etc which can parse feedback files, and uses the siplib api to create a sip +class Feedback(): + + def __init__(self, feedback): + self.__inputstrings = feedback + self.__tree = {} + print "parsing",len(feedback),"lines of feedback" + for line in feedback: + if line.strip() and not line.startswith("#"): + key, value = line.split('=') + t = self.__tree + if value.strip(): + for item in key.split('.')[:-1]: + #if not item == "ObsSW" and not item == "Observation" and not item == "DataProducts": //todo: check if the hierarchy can/should be flattened + t = t.setdefault(item, {}) + + try: + t[key.split('.')[-1]] = value.strip().replace("\"","") + except: + t[key.split('.')[-1]] = value.strip() + + #pprint.pprint(self.__tree) + + def __get_basic_sip(self, dataproduct): + campaign = self.__tree.get("ObsSW").get("Observation").get("Campaign") #todo: check whether this is always available + + # todo: ! Figure out what feedback exactly users have available and fix this! + # Possible solution: The specific dataproduct that shall be archived with this SIP has to be provided as an argument + # Another solution: SIPs for all dataproducts are generated in one go. This should be the ones in self.__tree.get("ObsSW").get("Observation").get("Dataproducts") + # !!! This is a dummy! + + sip = siplib.Sip( + project_code=campaign.get("name"), + project_primaryinvestigator=campaign.get("PI"), + project_contactauthor=campaign.get("contact"), + project_description=campaign.get("title"), + dataproduct=dataproduct, + project_coinvestigators=[campaign.get("CO_I")] + ) + + return sip + + def __convert_iso(self, td): + # determine duration in ISO format (couldn't find a nice lib for it) + y,w,d,h,m,s = td.days/365, (td.days/7)%365, (td.days/7)%7, td.seconds/3600, (td.seconds/60)%60, td.seconds%60 + duration = 'P{}Y{}M{}DT{}H{}M{}S'.format(y,w,d,h,m,s) + return duration + + + def get_dataproduct_sips(self): + + print "Generating SIPs for all dataproducts" + + obs = self.__tree.get("ObsSW").get("Observation") + dps = self.__tree.get("Observation").get("DataProducts") + campaign = self.__tree.get("ObsSW").get("Observation").get("Campaign") + + antennaset = obs.get("antennaSet").split("_")[0]+" "+obs.get("antennaSet").split("_")[1].title() + antennafields = obs.get("antennaArray").split(";") + stations = [] + y = obs.get("VirtualInstrument").get("stationList").replace("[","").replace("]","").split(",") + for x in y: + stations.append(siplib.Station.preconfigured(str(x),antennafields)) + + + # determine duration in ISO format (couldn't find a nice lib for it) + td= (datetime.datetime.strptime(obs.get("stopTime"),"%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(obs.get("startTime"),"%Y-%m-%d %H:%M:%S")) + duration = self.__convert_iso(td) + + + #---optional items: + # todo: if ... else set None + correlatorprocessing=siplib.CorrelatorProcessing( + integrationinterval=0.5, + integrationinterval_unit="ns", + channelwidth_frequency=160, + channelwidth_frequencyunit="MHz" + ), + coherentstokesprocessing=siplib.CoherentStokesProcessing( + rawsamplingtime=20, + rawsamplingtime_unit="ns", + timesamplingdownfactor=2, + samplingtime=10, + samplingtime_unit="ns", + stokes=["XX"], + numberofstations=1, + stations=[siplib.Station.preconfigured("CS002",["HBA0","HBA1"])], + frequencydownsamplingfactor=2, + numberofcollapsedchannels=2, + channelwidth_frequency=160, + channelwidth_frequencyunit="MHz", + channelspersubband=122 + ), + incoherentstokesprocessing=siplib.IncoherentStokesProcessing( + rawsamplingtime=20, + rawsamplingtime_unit="ns", + timesamplingdownfactor=2, + samplingtime=10, + samplingtime_unit="ns", + stokes=["XX"], + numberofstations=1, + stations=[siplib.Station.preconfigured("CS003",["HBA0","HBA1"])], + frequencydownsamplingfactor=2, + numberofcollapsedchannels=2, + channelwidth_frequency=160, + channelwidth_frequencyunit="MHz", + channelspersubband=122 + ), + flyseyeprocessing=siplib.FlysEyeProcessing( + rawsamplingtime=10, + rawsamplingtime_unit="ms", + timesamplingdownfactor=2, + samplingtime=2, + samplingtime_unit="ms", + stokes=["I"], + ), + nonstandardprocessing=siplib.NonStandardProcessing( + channelwidth_frequency=160, + channelwidth_frequencyunit="MHz", + channelspersubband=122 + ) + #--- + + sips = None + for dp in (v for k,v in dps.items() if k.startswith("Output_Beamformed_[")): + + print "Creating SIP for", dp.get("filename") + + if not sips: + sips = {} + + pointings = None + for key in (k for k,v in obs.items() if k.startswith("Beam[")): + beam = obs.get(key) + + if not pointings: + pointings=[] + + point=siplib.PointingAltAz( #todo: check if always azel pointing or check on "directionType" + az_angle=beam.get("angle1"), + az_angleunit=constants.ANGLEUNIT_RADIANS, + alt_angle=beam.get("angle2"), + alt_angleunit=constants.ANGLEUNIT_RADIANS, + equinox=constants.EQUINOXTYPE_J2000, #beam.get("directionType") # todo: Is this the right value? + ) + #todo elif the thousand other directionType options... conversion needed? + + if beam.get("startTime"): + starttime = beam.get("startTime").replace(" ","T") #todo: add to obs starttime ?! + else: + starttime = obs.get("startTime").replace(" ","T") + + if beam.get("duration") == "0": + dur = duration + else: + dur = int(beam.get("duration")) + + pointings.append( + siplib.SubArrayPointing( + pointing = point, + beamnumber=key.split("[")[1].split("]")[0], + subarraypointingidentifier=beam.get("momID"), + subarraypointingidentifier_source="MoM", + measurementtype=constants.MEASUREMENTTYPE_TARGET, # todo + targetname=beam.get("target"), + starttime=starttime, + duration=dur, + numberofprocessing=1, # todo + numberofcorrelateddataproducts=2, # todo + numberofbeamformeddataproducts=1, # todo + relations=[siplib.ProcessRelation( + identifier_source="source", # todo + identifier="90")], # todo + #todo: optional kwargs + ) + ) + + + # create SIP document + sip = self.__get_basic_sip( + siplib.BeamFormedDataProduct( + siplib.DataProductMap( + type=constants.DATAPRODUCTTYPE_BEAM_FORMED_DATA, + source="space", # todo + identifier="42", # todo + size=dp.get("size"), + filename=dp.get("filename"), + fileformat=dp.get("fileFormat"), + processsource="someone gave it to me", # todo + processid="SIPlib 0.1", # todo + #checksum_md5="hash1", # todo + #checksum_adler32="hash2", # todo + #storageticket="ticket" # todo + ) + # todo: beams + ) + ) + tbbevents=None #["event1","event2"] #todo + + # add the observation for this dataproduct + sip.add_observation(observingmode=obs.get("processSubtype"), + instrumentfilter=obs.get("bandFilter")[4:].replace("_","-") +" MHz", + clock_frequency=int(obs.get("sampleClock")), #obs.get("clockMode")[-3:] + clock_frequencyunit=constants.FREQUENCYUNIT_MHZ, + stationselection=constants.STATIONSELECTIONTYPE_CORE, # todo + antennaset=antennaset, + timesystem=constants.TIMESYSTEMTYPE_UTC, # todo + stations=stations, + numberofstations=len(stations), + numberofsubarraypointings=5, #todo + numberoftbbevents=5, #todo + numberofcorrelateddataproducts=dps.get("nrOfOutput_Correlated_"), + numberofbeamformeddataproducts=dps.get("nrOfOutput_Beamformed_"), + numberofbitspersample=5, # todo + process_map= + siplib.ProcessMap( + strategyname=obs.get("strategy"), + strategydescription="awesome strategy", # todo + starttime=obs.get("startTime").replace(" ","T"), + duration= duration, + observation_source="SAS", + observation_id="SAS VIC Tree Id", # todo + process_source="MoM", + process_id="momid", #obs.get(some_beam).get("momID"), # todo + #parset_source="parsource", # todo + #parset_id="parid", #todo, + relations=[siplib.ProcessRelation( + identifier_source="groupid source", #todo + identifier="groupid")] #todo + ), + observationdescription=campaign.get("title"), #todo + #channelwidth_frequency=160, + #channelwidth_frequencyunit="MHz", + #channelspersubband=5, + subarraypointings=pointings + #transientbufferboardevents=tbbevents #todo + ) + sips[dp.get("filename")] = sip + return sips + + + def bla(self): + pass + + #expose a main method in this lib module which can be used by external programs #or by the bin/feedback2sip 'program' -def main(): +def main(argv): #do the proper calls to the feedback/SIP API # parse cmdline args etc - print 'hello world!' + with open(argv[1]) as f: + print argv[1] + text = f.readlines() + feedback = Feedback(text) + sips = feedback.get_dataproduct_sips() + print sips.values()[0].get_prettyxml() + +if __name__ == '__main__': + main(sys.argv) \ No newline at end of file diff --git a/lib/siplib.py b/lib/siplib.py index 7c2d6afdf8416e31465dbd0280d2744bea677ea3..9cabb5f126ca8e76909b513fc7ff88429cc55e62 100644 --- a/lib/siplib.py +++ b/lib/siplib.py @@ -31,8 +31,8 @@ import constants import os VERSION = "SIPlib 0.1" -STATION_CONFIG_PATH = "/home/jkuensem/dev/SIP-lib/SIPlib-Task9091/LTA/sip/lib/station_coordinates.conf" # todo fix path for cmake tests -# STATION_CONFIG_PATH = os.path.expanduser("station_coordinates.conf") +#STATION_CONFIG_PATH = "/home/jkuensem/dev/SIP-lib/SIPlib-Task9091/LTA/sip/lib/station_coordinates.conf" # todo fix path for cmake tests +STATION_CONFIG_PATH = os.path.expanduser("station_coordinates.conf") # todo: create docstrings for everything. # specify types and explain purpose of the field (-> ask someone with more astronomical background) @@ -77,7 +77,7 @@ class Station(): with open(STATION_CONFIG_PATH, 'r') as f: for line in f.readlines(): if line.strip(): - field_coords = eval("dict("+line+")") + field_coords = eval("dict("+line+")") # literal_eval does not accept dict definition via constructor. Make sure config file is not writable to prevent code execution! for type in antennafieldtypes: if field_coords["name"] == name+"_"+type: __afield=AntennafieldXYZ( @@ -537,13 +537,15 @@ class BeamFormedDataProduct(): beams=None): __beams=None + __nbeams=0 if beams: __beams=ltasip.ArrayBeams() + __nbeams = len(beams) for beam in beams: __beams.append(beam.get_pyxb_beam()) self.__pyxb_dataproduct=ltasip.BeamFormedDataProduct( - numberOfBeams=len(beams), + numberOfBeams=__nbeams, beams=__beams, **dataproduct_map.get_dict()) @@ -1170,7 +1172,7 @@ class Sip(): for station in stations: __stations.append(station.get_pyxb_station()) - __tbbevents=None, + __tbbevents=None if(transientbufferboardevents): __tbbevents = ltasip.TransientBufferBoardEvents() for source in transientbufferboardevents: diff --git a/lib/visualize.py b/lib/visualize.py new file mode 100755 index 0000000000000000000000000000000000000000..c9e3ff79a4f711bdeb6ffc3a1dc836ed9fe67cd9 --- /dev/null +++ b/lib/visualize.py @@ -0,0 +1,160 @@ +#!/usr/bin/python + +from graphviz import Digraph +import sys +import siplib +import ltasip + +def visualize_sip(sip): + + linkstodataproduct = {} + linkstoprocess = {} + + dot_wrapper = Digraph('cluster_wrapper') + + # --- + # create legend + dot_legend = Digraph('cluster_legend') # graphviz needs a label starting with cluster to render styles, oh boy... + dot_legend.body.append('style=filled') + dot_legend.body.append('bgcolor=lightgrey') + dot_legend.body.append('label="Legend\n\n"') + + dot_legend.node('A',"Described Dataproduct",style="filled",fillcolor="cadetblue", shape="note") + dot_legend.node('B',"Related Dataproduct",style="filled",fillcolor="cadetblue2", shape="note") + dot_legend.node('C',"Observation", style="filled", fillcolor="gold",shape="octagon") + dot_legend.node('D',"Pipeline/Process ",style="filled",fillcolor="chartreuse", shape="cds") + dot_legend.node('E', "Unspec. Process", style="filled", fillcolor="orange", shape="hexagon") + dot_legend.edge('A','B',color="invis") + dot_legend.edge('B','C',color="invis") + dot_legend.edge('C','D',color="invis") + dot_legend.edge('D','E',color="invis") + + # --- + # create the actual sip graph + dot = Digraph('cluster_sip') + dot.body.append('style=filled') + dot.body.append('bgcolor=lightgrey') + dot.body.append('label = "'+str(sip.project.projectCode+" - "+sip.project.projectDescription)+'\n\n"') + + # the dataproduct that is described by the sip + data_out = sip.dataProduct + id_out = data_out.dataProductIdentifier.identifier + dot.node(id_out, id_out +": "+data_out.fileName,style="filled",fillcolor="cadetblue", shape="note") + id_process = data_out.processIdentifier.identifier + # keep reference to originating pipeline run / observation: + linkstodataproduct.setdefault(id_out,[]).append(id_process) + + # the input / intermediate dataproducts + for data_in in sip.relatedDataProduct: + id_in = data_in.dataProductIdentifier.identifier + dot.node(id_in, id_in +": "+data_in.fileName, style="filled", shape="note",fillcolor="cadetblue2") + id_process = data_in.processIdentifier.identifier + # keep reference to originating pipeline run / observation: + linkstodataproduct.setdefault(id_in,[]).append(id_process) + + # the observations + for obs in sip.observation: + id_obs = obs.observationId.identifier + id_process = obs.processIdentifier.identifier + dot.node(id_obs, id_obs + ": "+ id_process, style="filled", fillcolor="gold",shape="octagon") + # no incoming data here, but register node as present: + linkstoprocess.setdefault(id_obs,[]) + + # the data processing steps + for pipe in sip.pipelineRun: + id_pipe = pipe.processIdentifier.identifier + dot.node(id_pipe, id_pipe+" ", style="filled", fillcolor="chartreuse", shape="cds") + # keep reference to input dataproducts: + id_in = [] + for id in pipe.sourceData: + id_in.append(id.identifier) + linkstoprocess.setdefault(id_pipe,[]).append(id_in) + + # the data processing steps + for unspec in sip.unspecifiedProcess: + id_unspec = unspec.processIdentifier.identifier + dot.node(id_unspec, id_unspec, style="filled", fillcolor="orange", shape="hexagon") + # no incoming data here, but register node as present: + linkstoprocess.setdefault(id_unspec,[]) + + + # todo: online processing + # todo: parsets (?) + + print linkstoprocess + print linkstodataproduct + + # add edges: + for id in linkstodataproduct: + for id_from in linkstodataproduct.get(id): + if id_from in linkstoprocess: + dot.edge(id_from, id) + else: + print "Error: The pipeline or observation that created dataproduct '"+ id + "' seems to be missing! -> ", id_from + + for id in linkstoprocess: + for id_from in linkstoprocess.get(id): + if id_from in linkstodataproduct: + dot.edge(id_from, id) + else: + print "Error: The input dataproduct for pipeline '"+ id +"' seems to be missing! -> ", id_from + + + # ---- + # render graph: + dot_wrapper.subgraph(dot_legend) + dot_wrapper.subgraph(dot) + dot_wrapper = stylize(dot_wrapper) + dot_wrapper.render('sip.gv', view=True) + + + + + + +def stylize(graph): + styles = { + 'graph': { + 'fontname': 'Helvetica', + 'fontsize': '18', + 'fontcolor': 'grey8', + 'bgcolor': 'grey90', + 'rankdir': 'TB', + }, + 'nodes': { + 'fontname': 'Helvetica', + 'fontcolor': 'grey8', + 'color': 'grey8', + }, + 'edges': { + 'arrowhead': 'open', + 'fontname': 'Courier', + 'fontsize': '12', + 'fontcolor': 'grey8', + } + } + + graph.graph_attr.update( + ('graph' in styles and styles['graph']) or {} + ) + graph.node_attr.update( + ('nodes' in styles and styles['nodes']) or {} + ) + graph.edge_attr.update( + ('edges' in styles and styles['edges']) or {} + ) + return graph + + + + +def main(argv): + print "Reading xml from file", argv[1] + with open(argv[1]) as f: + xml = f.read() + sip = ltasip.CreateFromDocument(xml) + visualize_sip(sip) + + +if __name__ == '__main__': + main(sys.argv) \ No newline at end of file