Skip to content
Snippets Groups Projects
Commit 4519e5e2 authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

Task #9091 - Working basic graph visualization

parent 761c56f8
Branches
Tags
No related merge requests found
#!/usr/bin/python
from lofar.lta.sip import siplib as sip import sys
import pprint
import siplib
import constants
from ast import literal_eval
import datetime
import copy
#fill in lots of methods/classes etc which can parse feedback files, and uses the siplib api to create a sip #fill in lots of methods/classes etc which can parse feedback files, and uses the siplib api to create a sip
class Feedback():
def __init__(self, feedback):
self.__inputstrings = feedback
self.__tree = {}
print "parsing",len(feedback),"lines of feedback"
for line in feedback:
if line.strip() and not line.startswith("#"):
key, value = line.split('=')
t = self.__tree
if value.strip():
for item in key.split('.')[:-1]:
#if not item == "ObsSW" and not item == "Observation" and not item == "DataProducts": //todo: check if the hierarchy can/should be flattened
t = t.setdefault(item, {})
try:
t[key.split('.')[-1]] = value.strip().replace("\"","")
except:
t[key.split('.')[-1]] = value.strip()
#pprint.pprint(self.__tree)
def __get_basic_sip(self, dataproduct):
campaign = self.__tree.get("ObsSW").get("Observation").get("Campaign") #todo: check whether this is always available
# todo: ! Figure out what feedback exactly users have available and fix this!
# Possible solution: The specific dataproduct that shall be archived with this SIP has to be provided as an argument
# Another solution: SIPs for all dataproducts are generated in one go. This should be the ones in self.__tree.get("ObsSW").get("Observation").get("Dataproducts")
# !!! This is a dummy!
sip = siplib.Sip(
project_code=campaign.get("name"),
project_primaryinvestigator=campaign.get("PI"),
project_contactauthor=campaign.get("contact"),
project_description=campaign.get("title"),
dataproduct=dataproduct,
project_coinvestigators=[campaign.get("CO_I")]
)
return sip
def __convert_iso(self, td):
# determine duration in ISO format (couldn't find a nice lib for it)
y,w,d,h,m,s = td.days/365, (td.days/7)%365, (td.days/7)%7, td.seconds/3600, (td.seconds/60)%60, td.seconds%60
duration = 'P{}Y{}M{}DT{}H{}M{}S'.format(y,w,d,h,m,s)
return duration
def get_dataproduct_sips(self):
print "Generating SIPs for all dataproducts"
obs = self.__tree.get("ObsSW").get("Observation")
dps = self.__tree.get("Observation").get("DataProducts")
campaign = self.__tree.get("ObsSW").get("Observation").get("Campaign")
antennaset = obs.get("antennaSet").split("_")[0]+" "+obs.get("antennaSet").split("_")[1].title()
antennafields = obs.get("antennaArray").split(";")
stations = []
y = obs.get("VirtualInstrument").get("stationList").replace("[","").replace("]","").split(",")
for x in y:
stations.append(siplib.Station.preconfigured(str(x),antennafields))
# determine duration in ISO format (couldn't find a nice lib for it)
td= (datetime.datetime.strptime(obs.get("stopTime"),"%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(obs.get("startTime"),"%Y-%m-%d %H:%M:%S"))
duration = self.__convert_iso(td)
#---optional items:
# todo: if ... else set None
correlatorprocessing=siplib.CorrelatorProcessing(
integrationinterval=0.5,
integrationinterval_unit="ns",
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz"
),
coherentstokesprocessing=siplib.CoherentStokesProcessing(
rawsamplingtime=20,
rawsamplingtime_unit="ns",
timesamplingdownfactor=2,
samplingtime=10,
samplingtime_unit="ns",
stokes=["XX"],
numberofstations=1,
stations=[siplib.Station.preconfigured("CS002",["HBA0","HBA1"])],
frequencydownsamplingfactor=2,
numberofcollapsedchannels=2,
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz",
channelspersubband=122
),
incoherentstokesprocessing=siplib.IncoherentStokesProcessing(
rawsamplingtime=20,
rawsamplingtime_unit="ns",
timesamplingdownfactor=2,
samplingtime=10,
samplingtime_unit="ns",
stokes=["XX"],
numberofstations=1,
stations=[siplib.Station.preconfigured("CS003",["HBA0","HBA1"])],
frequencydownsamplingfactor=2,
numberofcollapsedchannels=2,
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz",
channelspersubband=122
),
flyseyeprocessing=siplib.FlysEyeProcessing(
rawsamplingtime=10,
rawsamplingtime_unit="ms",
timesamplingdownfactor=2,
samplingtime=2,
samplingtime_unit="ms",
stokes=["I"],
),
nonstandardprocessing=siplib.NonStandardProcessing(
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz",
channelspersubband=122
)
#---
sips = None
for dp in (v for k,v in dps.items() if k.startswith("Output_Beamformed_[")):
print "Creating SIP for", dp.get("filename")
if not sips:
sips = {}
pointings = None
for key in (k for k,v in obs.items() if k.startswith("Beam[")):
beam = obs.get(key)
if not pointings:
pointings=[]
point=siplib.PointingAltAz( #todo: check if always azel pointing or check on "directionType"
az_angle=beam.get("angle1"),
az_angleunit=constants.ANGLEUNIT_RADIANS,
alt_angle=beam.get("angle2"),
alt_angleunit=constants.ANGLEUNIT_RADIANS,
equinox=constants.EQUINOXTYPE_J2000, #beam.get("directionType") # todo: Is this the right value?
)
#todo elif the thousand other directionType options... conversion needed?
if beam.get("startTime"):
starttime = beam.get("startTime").replace(" ","T") #todo: add to obs starttime ?!
else:
starttime = obs.get("startTime").replace(" ","T")
if beam.get("duration") == "0":
dur = duration
else:
dur = int(beam.get("duration"))
pointings.append(
siplib.SubArrayPointing(
pointing = point,
beamnumber=key.split("[")[1].split("]")[0],
subarraypointingidentifier=beam.get("momID"),
subarraypointingidentifier_source="MoM",
measurementtype=constants.MEASUREMENTTYPE_TARGET, # todo
targetname=beam.get("target"),
starttime=starttime,
duration=dur,
numberofprocessing=1, # todo
numberofcorrelateddataproducts=2, # todo
numberofbeamformeddataproducts=1, # todo
relations=[siplib.ProcessRelation(
identifier_source="source", # todo
identifier="90")], # todo
#todo: optional kwargs
)
)
# create SIP document
sip = self.__get_basic_sip(
siplib.BeamFormedDataProduct(
siplib.DataProductMap(
type=constants.DATAPRODUCTTYPE_BEAM_FORMED_DATA,
source="space", # todo
identifier="42", # todo
size=dp.get("size"),
filename=dp.get("filename"),
fileformat=dp.get("fileFormat"),
processsource="someone gave it to me", # todo
processid="SIPlib 0.1", # todo
#checksum_md5="hash1", # todo
#checksum_adler32="hash2", # todo
#storageticket="ticket" # todo
)
# todo: beams
)
)
tbbevents=None #["event1","event2"] #todo
# add the observation for this dataproduct
sip.add_observation(observingmode=obs.get("processSubtype"),
instrumentfilter=obs.get("bandFilter")[4:].replace("_","-") +" MHz",
clock_frequency=int(obs.get("sampleClock")), #obs.get("clockMode")[-3:]
clock_frequencyunit=constants.FREQUENCYUNIT_MHZ,
stationselection=constants.STATIONSELECTIONTYPE_CORE, # todo
antennaset=antennaset,
timesystem=constants.TIMESYSTEMTYPE_UTC, # todo
stations=stations,
numberofstations=len(stations),
numberofsubarraypointings=5, #todo
numberoftbbevents=5, #todo
numberofcorrelateddataproducts=dps.get("nrOfOutput_Correlated_"),
numberofbeamformeddataproducts=dps.get("nrOfOutput_Beamformed_"),
numberofbitspersample=5, # todo
process_map=
siplib.ProcessMap(
strategyname=obs.get("strategy"),
strategydescription="awesome strategy", # todo
starttime=obs.get("startTime").replace(" ","T"),
duration= duration,
observation_source="SAS",
observation_id="SAS VIC Tree Id", # todo
process_source="MoM",
process_id="momid", #obs.get(some_beam).get("momID"), # todo
#parset_source="parsource", # todo
#parset_id="parid", #todo,
relations=[siplib.ProcessRelation(
identifier_source="groupid source", #todo
identifier="groupid")] #todo
),
observationdescription=campaign.get("title"), #todo
#channelwidth_frequency=160,
#channelwidth_frequencyunit="MHz",
#channelspersubband=5,
subarraypointings=pointings
#transientbufferboardevents=tbbevents #todo
)
sips[dp.get("filename")] = sip
return sips
def bla(self):
pass
#expose a main method in this lib module which can be used by external programs #expose a main method in this lib module which can be used by external programs
#or by the bin/feedback2sip 'program' #or by the bin/feedback2sip 'program'
def main(): def main(argv):
#do the proper calls to the feedback/SIP API #do the proper calls to the feedback/SIP API
# parse cmdline args etc # parse cmdline args etc
print 'hello world!' with open(argv[1]) as f:
print argv[1]
text = f.readlines()
feedback = Feedback(text)
sips = feedback.get_dataproduct_sips()
print sips.values()[0].get_prettyxml()
if __name__ == '__main__':
main(sys.argv)
\ No newline at end of file
...@@ -31,8 +31,8 @@ import constants ...@@ -31,8 +31,8 @@ import constants
import os import os
VERSION = "SIPlib 0.1" VERSION = "SIPlib 0.1"
STATION_CONFIG_PATH = "/home/jkuensem/dev/SIP-lib/SIPlib-Task9091/LTA/sip/lib/station_coordinates.conf" # todo fix path for cmake tests #STATION_CONFIG_PATH = "/home/jkuensem/dev/SIP-lib/SIPlib-Task9091/LTA/sip/lib/station_coordinates.conf" # todo fix path for cmake tests
# STATION_CONFIG_PATH = os.path.expanduser("station_coordinates.conf") STATION_CONFIG_PATH = os.path.expanduser("station_coordinates.conf")
# todo: create docstrings for everything. # todo: create docstrings for everything.
# specify types and explain purpose of the field (-> ask someone with more astronomical background) # specify types and explain purpose of the field (-> ask someone with more astronomical background)
...@@ -77,7 +77,7 @@ class Station(): ...@@ -77,7 +77,7 @@ class Station():
with open(STATION_CONFIG_PATH, 'r') as f: with open(STATION_CONFIG_PATH, 'r') as f:
for line in f.readlines(): for line in f.readlines():
if line.strip(): if line.strip():
field_coords = eval("dict("+line+")") field_coords = eval("dict("+line+")") # literal_eval does not accept dict definition via constructor. Make sure config file is not writable to prevent code execution!
for type in antennafieldtypes: for type in antennafieldtypes:
if field_coords["name"] == name+"_"+type: if field_coords["name"] == name+"_"+type:
__afield=AntennafieldXYZ( __afield=AntennafieldXYZ(
...@@ -537,13 +537,15 @@ class BeamFormedDataProduct(): ...@@ -537,13 +537,15 @@ class BeamFormedDataProduct():
beams=None): beams=None):
__beams=None __beams=None
__nbeams=0
if beams: if beams:
__beams=ltasip.ArrayBeams() __beams=ltasip.ArrayBeams()
__nbeams = len(beams)
for beam in beams: for beam in beams:
__beams.append(beam.get_pyxb_beam()) __beams.append(beam.get_pyxb_beam())
self.__pyxb_dataproduct=ltasip.BeamFormedDataProduct( self.__pyxb_dataproduct=ltasip.BeamFormedDataProduct(
numberOfBeams=len(beams), numberOfBeams=__nbeams,
beams=__beams, beams=__beams,
**dataproduct_map.get_dict()) **dataproduct_map.get_dict())
...@@ -1170,7 +1172,7 @@ class Sip(): ...@@ -1170,7 +1172,7 @@ class Sip():
for station in stations: for station in stations:
__stations.append(station.get_pyxb_station()) __stations.append(station.get_pyxb_station())
__tbbevents=None, __tbbevents=None
if(transientbufferboardevents): if(transientbufferboardevents):
__tbbevents = ltasip.TransientBufferBoardEvents() __tbbevents = ltasip.TransientBufferBoardEvents()
for source in transientbufferboardevents: for source in transientbufferboardevents:
......
#!/usr/bin/python
from graphviz import Digraph
import sys
import siplib
import ltasip
def visualize_sip(sip):
linkstodataproduct = {}
linkstoprocess = {}
dot_wrapper = Digraph('cluster_wrapper')
# ---
# create legend
dot_legend = Digraph('cluster_legend') # graphviz needs a label starting with cluster to render styles, oh boy...
dot_legend.body.append('style=filled')
dot_legend.body.append('bgcolor=lightgrey')
dot_legend.body.append('label="Legend\n\n"')
dot_legend.node('A',"Described Dataproduct",style="filled",fillcolor="cadetblue", shape="note")
dot_legend.node('B',"Related Dataproduct",style="filled",fillcolor="cadetblue2", shape="note")
dot_legend.node('C',"Observation", style="filled", fillcolor="gold",shape="octagon")
dot_legend.node('D',"Pipeline/Process ",style="filled",fillcolor="chartreuse", shape="cds")
dot_legend.node('E', "Unspec. Process", style="filled", fillcolor="orange", shape="hexagon")
dot_legend.edge('A','B',color="invis")
dot_legend.edge('B','C',color="invis")
dot_legend.edge('C','D',color="invis")
dot_legend.edge('D','E',color="invis")
# ---
# create the actual sip graph
dot = Digraph('cluster_sip')
dot.body.append('style=filled')
dot.body.append('bgcolor=lightgrey')
dot.body.append('label = "'+str(sip.project.projectCode+" - "+sip.project.projectDescription)+'\n\n"')
# the dataproduct that is described by the sip
data_out = sip.dataProduct
id_out = data_out.dataProductIdentifier.identifier
dot.node(id_out, id_out +": "+data_out.fileName,style="filled",fillcolor="cadetblue", shape="note")
id_process = data_out.processIdentifier.identifier
# keep reference to originating pipeline run / observation:
linkstodataproduct.setdefault(id_out,[]).append(id_process)
# the input / intermediate dataproducts
for data_in in sip.relatedDataProduct:
id_in = data_in.dataProductIdentifier.identifier
dot.node(id_in, id_in +": "+data_in.fileName, style="filled", shape="note",fillcolor="cadetblue2")
id_process = data_in.processIdentifier.identifier
# keep reference to originating pipeline run / observation:
linkstodataproduct.setdefault(id_in,[]).append(id_process)
# the observations
for obs in sip.observation:
id_obs = obs.observationId.identifier
id_process = obs.processIdentifier.identifier
dot.node(id_obs, id_obs + ": "+ id_process, style="filled", fillcolor="gold",shape="octagon")
# no incoming data here, but register node as present:
linkstoprocess.setdefault(id_obs,[])
# the data processing steps
for pipe in sip.pipelineRun:
id_pipe = pipe.processIdentifier.identifier
dot.node(id_pipe, id_pipe+" ", style="filled", fillcolor="chartreuse", shape="cds")
# keep reference to input dataproducts:
id_in = []
for id in pipe.sourceData:
id_in.append(id.identifier)
linkstoprocess.setdefault(id_pipe,[]).append(id_in)
# the data processing steps
for unspec in sip.unspecifiedProcess:
id_unspec = unspec.processIdentifier.identifier
dot.node(id_unspec, id_unspec, style="filled", fillcolor="orange", shape="hexagon")
# no incoming data here, but register node as present:
linkstoprocess.setdefault(id_unspec,[])
# todo: online processing
# todo: parsets (?)
print linkstoprocess
print linkstodataproduct
# add edges:
for id in linkstodataproduct:
for id_from in linkstodataproduct.get(id):
if id_from in linkstoprocess:
dot.edge(id_from, id)
else:
print "Error: The pipeline or observation that created dataproduct '"+ id + "' seems to be missing! -> ", id_from
for id in linkstoprocess:
for id_from in linkstoprocess.get(id):
if id_from in linkstodataproduct:
dot.edge(id_from, id)
else:
print "Error: The input dataproduct for pipeline '"+ id +"' seems to be missing! -> ", id_from
# ----
# render graph:
dot_wrapper.subgraph(dot_legend)
dot_wrapper.subgraph(dot)
dot_wrapper = stylize(dot_wrapper)
dot_wrapper.render('sip.gv', view=True)
def stylize(graph):
styles = {
'graph': {
'fontname': 'Helvetica',
'fontsize': '18',
'fontcolor': 'grey8',
'bgcolor': 'grey90',
'rankdir': 'TB',
},
'nodes': {
'fontname': 'Helvetica',
'fontcolor': 'grey8',
'color': 'grey8',
},
'edges': {
'arrowhead': 'open',
'fontname': 'Courier',
'fontsize': '12',
'fontcolor': 'grey8',
}
}
graph.graph_attr.update(
('graph' in styles and styles['graph']) or {}
)
graph.node_attr.update(
('nodes' in styles and styles['nodes']) or {}
)
graph.edge_attr.update(
('edges' in styles and styles['edges']) or {}
)
return graph
def main(argv):
print "Reading xml from file", argv[1]
with open(argv[1]) as f:
xml = f.read()
sip = ltasip.CreateFromDocument(xml)
visualize_sip(sip)
if __name__ == '__main__':
main(sys.argv)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment