Skip to content
Snippets Groups Projects
Commit d9d7a94e authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

Task #9091 - refactored class structure

parent 1820f468
Branches
Tags
No related merge requests found
#!/usr/bin/python
# This module provides functions for easy creation of a Lofar LTA SIP document.
# It builds upon a Pyxb-generated API from the schema definition, which is very clever but hard to use, since
# the arguments in class constructors and functions definitions are not verbose and there is no intuitive way
# to determine the mandatory and optional elements to create a valid SIP document. This module is designed to
# provide easy-to-use functions that bridges this shortcoming of the Pyxb API.
#
# Usage: Import module. Create an instance of Sip. Add elements through the Sip.add_X functions.
# Many of these functions require instances of other classes of the module.
#
#
# Note on validation: From construction through every addition, the SIP should remain valid (or throw an error
# that clearly points out where e.g. a given value does not meet the restrictions of the SIP schema.
#
# Note on code structure: This has to be seen as a compromise between elegant and maintainable code with well
# structured inheritance close to the schema definition on the one hand, and something more straightforward to use,
# with flatter hierarchies on the other hand.
#
# Note on parameter maps: The ...Map objects are helper objects to create dictionaries for the commonly used
# constructor arguments of several other objects. This could alternatively also be implemented via inheritance from
# a supertype, and indeed is solved like this in the pyxb code. However, this then requires the use of an argument
# list pointer, which hides the list of required and optional arguments from the user. Alternatively, all arguments
# have to be mapped in all constructors repeatedly, creating lots of boilerplate code. This is the nicest approach
# I could think of that keeps the whole thing reasonably maintainable.
import ltasip
import pyxb
VERSION = "SIPlib 0.1"
# todo: Find a nice way how to access permitted types by the XSD restrictions
# To get list of permitted values, e.g. do: print ltasip.StationTypeType.values()
# But maybe we want to define these as constants in here?
# todo: find a nice way to define the stations.
# Probably create a Station class for this
# We should prepare a dictionary with all present
# ===============================
# Station definitions:
# todo: This is preliminary! (see above)
# create station
afield = ltasip.AntennaField(name="HBA", location=ltasip.Coordinates("ITRF2005",ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m")))
afield2 = ltasip.AntennaField(name="LBA", location=ltasip.Coordinates("ITRF2005",ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m")))
stationA = ltasip.Station(name="Bielefeld",stationType=ltasip.StationTypeType("International"))
stationA.antennaField.append(afield)
stationA.antennaField.append(afield2)
# create station
afield = ltasip.AntennaField(name="HBA", location=ltasip.Coordinates("ITRF2005",ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m")))
afield2 = ltasip.AntennaField(name="LBA", location=ltasip.Coordinates("ITRF2005",ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m")))
stationB = ltasip.Station(name="Dwingeloo",stationType=ltasip.StationTypeType("Core"))
stationB.antennaField.append(afield)
stationB.antennaField.append(afield2)
# create station dict:
STATIONS = dict(stationA=stationA,
stationB=stationB)
######################################### end station definitions
# ==============
# Processes:
class ProcessMap():
#todo: relations
def __init__(self,
strategyname,
strategydescription,
starttime,
duration,
observation_source,
observation_id,
process_source,
process_id,
parset_source=None,
parset_id=None,
):
self.process_map = dict(processIdentifier=ltasip.IdentifierType(source=process_source, identifier=process_id),
observationId=ltasip.IdentifierType(source=observation_source, identifier=observation_id),
relations=ltasip.ProcessRelations().append(
ltasip.ProcessRelation(relationType=ltasip.ProcessRelationType("GroupID"),
identifier=ltasip.IdentifierType(source="somesource",
identifier="relationID"))),
strategyName=strategyname, strategyDescription=strategydescription, startTime=starttime,
duration=duration)
if parset_id and parset_source:
self.process_map["parset"]=ltasip.IdentifierType(source=parset_source,identifier=parset_id)
def get_dict(self):
return self.process_map
class SimpleProcess():
def __init__(self, process_map):
self.pyxb_process = ltasip.Process(**process_map.get_dict())
def get_pyxb_process(self):
return self.pyxb_process
# ============
# PipelineRuns:
class PipelineMap():
def __init__(self,
name,
version,
sourcedata, #= type="DataSources"/>
process_map
):
pass
# todo
# ==========
# Dataproducts:
# ============
def create_dataproduct_mapping(
class DataProductMap():
def __init__(self,
type,
source,
identifier,
......@@ -23,7 +140,7 @@ def create_dataproduct_mapping(
checksum_adler32=None,
storageticket=None
):
dataproduct_map= dict(dataProductType=type,
self.dataproduct_map= dict(dataProductType=type,
dataProductIdentifier=ltasip.IdentifierType(source=source, identifier=identifier), size=size,
fileName=filename, fileFormat=fileformat,
processIdentifier=ltasip.IdentifierType(source=processsource, identifier=processid),
......@@ -34,95 +151,299 @@ def create_dataproduct_mapping(
checksums = checksums + [ltasip.ChecksumType(algorithm="MD5", value_=checksum_md5)]
if checksum_adler32 is not None:
checksums = checksums + [ltasip.ChecksumType(algorithm="Adler32", value_=checksum_adler32)]
dataproduct_map["checksum"]=checksums
self.dataproduct_map["checksum"]=checksums
return dataproduct_map
def get_dict(self):
return self.dataproduct_map
class SimpleDataProduct():
def __init__(self, dataproduct_map):
self.__pyxb_dataproduct = ltasip.DataProduct(**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
def create_simpledataproduct(dataproduct_map):
return ltasip.DataProduct(**dataproduct_map)
class SkyModelDataProduct():
def __init__(self, dataproduct_map):
self.__pyxb_dataproduct = ltasip.SkyModelDataProduct(**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class GenericDataProduct():
def __init__(self, dataproduct_map):
self.__pyxb_dataproduct = ltasip.GenericDataProduct(**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
def create_skymodeldataproduct(dataproduct_map):
return ltasip.SkyModelDataProduct(**dataproduct_map)
class UnspecifiedDataProduct():
def __init__(self, dataproduct_map):
self.__pyxb_dataproduct = ltasip.UnspecifiedDataProduct(**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class PixelMapDataProduct():
def __init__(self,
dataproduct_map,
numberofaxes,
coordinates):
# ==========
# Processes:
#=========
self.__pyxb_dataproduct = ltasip.PixelMapDataProduct(
numberOfAxes=numberofaxes,
numberOfCoordinates=len(coordinates),
coordinates = [x.get_pyxb_coordinate() for x in coordinates],
**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
#todo: relations
def create_process_mapping(
strategyname,
strategydescription,
startTime,
class SkyImageDataProduct():
def __init__(self,
dataproduct_map,
numberofaxes,
coordinates,
locationframe,
timeframe,
observationpointing,
restoringbeammajor,
restoringbeamminor,
rmsnoise):
self.__pyxb_dataproduct = ltasip.SkyImageDataProduct(
numberOfAxes=numberofaxes,
numberOfCoordinates=len(coordinates),
coordinate = [x.get_pyxb_coordinate() for x in coordinates],
locationFrame=locationframe,
timeFrame=timeframe,
observationPointing=observationpointing,
restoringBeamMajor=restoringbeammajor,
restoringBeamMinor=restoringbeamminor,
rmsNoise=rmsnoise,
**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class CorrelatedDataProduct():
def __init__(self,
dataproduct_map,
subarraypointing_source,
subarraypointing_identifier,
subband,
starttime,
duration,
observation_source,
observation_id,
process_source,
process_id,
parset_source=None,
parset_id=None,
integrationinterval,
integrationintervalunit,
central_frequency,
central_frequencyunit,
channelwidth_frequency,
channelwidth_frequencyunit,
channelspersubband,
stationsubband=None):
self.__pyxb_dataproduct=ltasip.CorrelatedDataProduct(
subArrayPointingIdentifier=ltasip.IdentifierType(source=subarraypointing_source, identifier=subarraypointing_identifier),
centralFrequency=ltasip.Frequency(central_frequency, units=central_frequencyunit),
channelWidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit),
subband=subband,
startTime=starttime,
duration=duration,
integrationInterval=ltasip.Time(integrationinterval,units=integrationintervalunit),
channelsPerSubband=channelspersubband,
stationSubband=stationsubband,
**dataproduct_map.get_dict()
)
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class InstrumentModelDataProduct():
def __init__(self, dataproduct_map):
self.__pyxb_dataproduct = ltasip.InstrumentModelDataProduct(**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class TransientBufferBoardDataProduct():
def __init__(self, dataproduct_map):
self.__pyxb_dataproduct = ltasip.TransientBufferBoardDataProduct(**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class BeamFormedDataProduct():
def __init__(self,
dataproduct_map,
numberofsamples,
timestamp,
triggertype,
triggervalue):
self.__pyxb_dataproduct = ltasip.BeamFormedDataProduct(
numberOfSamples=numberofsamples,
timeStamp=timestamp,
triggerParameters=ltasip.TBBTrigger(type=triggertype,value=triggervalue),
**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class PulpSummaryDataProduct():
def __init__(self,
dataproduct_map,
filecontent,
datatype):
self.__pyxb_dataproduct = ltasip.PulpSummaryDataProduct(
fileContent=filecontent,
dataType=datatype,
**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class PulpDataProduct():
def __init__(self,
dataproduct_map,
filecontent,
datatype,
arrayBeam):
self.__pyxb_dataproduct = ltasip.PulpDataProduct(
fileContent=filecontent,
dataType=datatype,
**dataproduct_map.get_dict())
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
# ============
# Coordinates:
class SpectralCoordinateLinear():
def __init__(self,
quantity_type,
quantity_value,
linearaxis,
):
self.__pyxb_coordinate=ltasip.SpectralCoordinate(
spectralLinearAxis=linearaxis.get_pyxb_axis(),
spectralQuantity=ltasip.SpectralQuantity(value_=quantity_value, type=quantity_type)
)
process_map = dict(processIdentifier=ltasip.IdentifierType(source=process_source, identifier=process_id),
observationId=ltasip.IdentifierType(source=observation_source, identifier=observation_id),
relations=ltasip.ProcessRelations().append(
ltasip.ProcessRelation(relationType=ltasip.ProcessRelationType("GroupID"),
identifier=ltasip.IdentifierType(source="somesource",
identifier="relationID"))),
strategyName=strategyname, strategyDescription=strategydescription, startTime=startTime,
duration=duration)
def get_pyxb_coordinate(self):
return self.__pyxb_coordinate
if parset_id and parset_source:
process_map["parset"]=ltasip.IdentifierType(source=parset_source,identifier=parset_id)
return process_map
class SpectralCoordinateTabular():
def __init__(self,
quantity_type,
quantity_value,
tabularaxis,
):
def create_simpleprocess(process_map):
return ltasip.Process(**process_map)
self.__pyxb_coordinate=ltasip.SpectralCoordinate(
spectralTabularAxis=tabularaxis.get_pyxb_axis(),
spectralQuantity=ltasip.SpectralQuantity(value_=quantity_value, type=quantity_type)
)
def get_pyxb_coordinate(self):
return self.__pyxb_coordinate
class TimeCoordinateLinear():
def __init__(self,
equinox,
linearaxis,
):
self.__pyxb_coordinate=ltasip.TimeCoordinate(
timeLinearAxis=linearaxis.get_pyxb_axis(),
equinox=equinox
)
# =========
# Stations:
# ========
def get_pyxb_coordinate(self):
return self.__pyxb_coordinate
#todo find more compact definition
# create station
afield = ltasip.AntennaField(name="HBA", location=ltasip.Coordinates("ITRF2005",ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m")))
afield2 = ltasip.AntennaField(name="LBA", location=ltasip.Coordinates("ITRF2005",ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m")))
stationA = ltasip.Station(name="Bielefeld",stationType=ltasip.StationTypeType("International"))
stationA.antennaField.append(afield)
stationA.antennaField.append(afield2)
#todo find more compact definition
# create station
afield = ltasip.AntennaField(name="HBA", location=ltasip.Coordinates("ITRF2005",ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m")))
afield2 = ltasip.AntennaField(name="LBA", location=ltasip.Coordinates("ITRF2005",ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m"),ltasip.Length(2.0, units="m")))
stationB = ltasip.Station(name="Dwingeloo",stationType=ltasip.StationTypeType("Core"))
stationB.antennaField.append(afield)
stationB.antennaField.append(afield2)
class TimeCoordinateTabular():
def __init__(self,
equinox,
tabularaxis,
):
# create station dict:
STATIONS = dict(stationA=stationA,
stationB=stationB)
self.__pyxb_coordinate=ltasip.TimeCoordinate(
timeTabularAxis=tabularaxis.get_pyxb_axis(),
equinox=equinox
)
def get_pyxb_coordinate(self):
return self.__pyxb_coordinate
class DirectionCoordinate():
def __init__(linearaxis_a,
linearaxis_b,
PC0_0,
PC0_1,
PC1_0,
PC1_1,
equinox,
raDecSystem,
projection,
projectionParameters,
longitudePole,
latitudePole):
pass
#todo
class LinearAxis():
def __init__(self,
number,
name,
units,
length,
increment,
referencepixel,
referencevalue):
self.__pyxb_axis= ltasip.LinearAxis(
number=number,
name=name,
units=units,
length=length,
increment=increment,
referencePixel=referencepixel,
referenceValue=referencevalue
)
def get_pyxb_axis(self):
return self.__pyxb_axis
class TabularAxis():
def __init__(self,
number,
name,
units,
length
):
self.__pyxb_axis=ltasip.TabularAxis(
number=number,
name=name,
units=units,
length=length,
)
def get_pyxb_axis(self):
return self.__pyxb_axis
# ============
# SIP document
# ============
class Sip():
......@@ -154,7 +475,9 @@ class Sip():
coInvestigator=project_coinvestigators,
)
self.sip.dataProduct=dataproduct
self.sip.dataProduct=dataproduct.get_pyxb_dataproduct()
self.get_prettyxml() # for validation
#-------------
......@@ -162,7 +485,8 @@ class Sip():
#---
def add_related_dataproduct(self, dataproduct):
self.sip.relatedDataProduct.append(dataproduct)
self.sip.relatedDataProduct.append(dataproduct.get_pyxb_dataproduct())
return self.get_prettyxml()
# todo: subarraypointings, transientbufferboardevents
def add_observation(self,
......@@ -212,15 +536,16 @@ class Sip():
channelsPerSubband=channelspersubband,
subArrayPointings=None, #" type="SubArrayPointings"/>
transientBufferBoardEvents=None, #" type="TransientBufferBoardEvents"/>
**process_map
**process_map.get_dict()
)
self.sip.observation.append(obs)
return self.get_prettyxml()
def add_pipelinerun(self):
pass
return self.get_prettyxml()
# todo relations
......@@ -233,10 +558,12 @@ class Sip():
up = ltasip.UnspecifiedProcess(
observingMode=observingmode,
description=description,
**process_map
**process_map.get_dict()
)
self.sip.unspecifiedProcess.append(up)
return self.get_prettyxml()
def add_parset(self,
source,
......@@ -248,25 +575,30 @@ class Sip():
contents=contents
))
return self.get_prettyxml()
def prettyprint(self):
def get_prettyxml(self):
try:
print self.sip.toDOM().toprettyxml()
return self.sip.toDOM().toprettyxml()
except pyxb.ValidationError as err:
#print str(err)
print err.details()
raise err
def prettyprint(self):
print self.get_prettyxml()
# ======================================================= end SIP class
########################################################### end SIP class
#expose a main method in this lib module which can be used by external programs
#or by the bin/generatesip 'program'
def main():
#do the proper calls to the SIP API
# parse cmdline args etc
# todo: This should be moved to testing, but makes it easier for me now.
# create example doc with mandatory attributes
mysip = Sip(project_code="code",
......@@ -275,8 +607,8 @@ def main():
project_telescope="LOFAR",
project_description="awesome project",
project_coinvestigators=["sidekick1", "sidekick2"],
dataproduct=create_simpledataproduct(
create_dataproduct_mapping(
dataproduct=SimpleDataProduct(
DataProductMap(
type="Unknown",
source="space",
identifier="42",
......@@ -287,7 +619,9 @@ def main():
processid="SIPlib 0.1",
checksum_md5="hash1",
checksum_adler32= "hash2",
storageticket="ticket"))
storageticket="ticket"
)
)
)
# add optional attributes
......@@ -299,8 +633,8 @@ def main():
# add optional dataproduct item
mysip.add_related_dataproduct(
create_skymodeldataproduct(
create_dataproduct_mapping(
GenericDataProduct(
DataProductMap(
type="Unknown",
source="space",
identifier="fourtytwo",
......@@ -316,6 +650,95 @@ def main():
print "===\nAdded related dataproduct:\n"
mysip.prettyprint()
# add optional dataproduct item
mysip.add_related_dataproduct(
SkyImageDataProduct(
DataProductMap(
type="Unknown",
source="space",
identifier="fourtytwo",
size=2048,
filename="/home/paulus/test.h5",
fileformat="HDF5",
processsource="someone gave it to me",
processid="SIPlib 0.1"
),
numberofaxes=2,
coordinates=[
SpectralCoordinateLinear(
quantity_type="Frequency",
quantity_value=20.0,
linearaxis=LinearAxis(
number=5,
name="bla",
units="parsec",
length=5,
increment=5,
referencepixel=7.5,
referencevalue=7.4)),
SpectralCoordinateTabular(
quantity_type="Frequency",
quantity_value=20.0,
tabularaxis=TabularAxis(
number=5,
name="bla",
units="parsec",
length=5,
))
],
locationframe="GEOCENTER",
timeframe="timeframe",
observationpointing=ltasip.Pointing(
rightAscension=ltasip.Angle(2.0, units="degrees"),
#azimuth=ltasip.Angle(),
declination=ltasip.Angle(2.0, units="degrees"),
#altitude=ltasip.Angle(),
equinox="SUN"),
restoringbeammajor=ltasip.Angle(2.0, units="degrees"),
restoringbeamminor=ltasip.Angle(2.0, units="degrees"),
rmsnoise=ltasip.Pixel("1.0", units="Jy/beam")
)
)
print "===\nAdded another related dataproduct:\n"
mysip.prettyprint()
# add optional dataproduct item
mysip.add_related_dataproduct(
CorrelatedDataProduct(
DataProductMap(
type="Unknown",
source="space",
identifier="fourtytwo",
size=2048,
filename="/home/paulus/test.h5",
fileformat="HDF5",
processsource="someone gave it to me",
processid="SIPlib 0.1"
),
subarraypointing_source="source",
subarraypointing_identifier="id",
subband="1",
starttime="1980-03-23T10:20:15",
duration= "P6Y3M10DT15H",
integrationinterval=10,
integrationintervalunit="ms",
central_frequency=160,
central_frequencyunit="MHz",
channelwidth_frequency=200,
channelwidth_frequencyunit="MHz",
channelspersubband=122,
stationsubband=2,
)
)
print "===\nAdded yet another related dataproduct:\n"
mysip.prettyprint()
# add optional observation item
mysip.add_observation(observingmode="Interferometer",
instrumentfilter="10-70 MHz",
......@@ -332,10 +755,10 @@ def main():
numberofbeamformeddataproducts=5,
numberofbitspersample=5,
process_map=
create_process_mapping(
ProcessMap(
strategyname="strategy1",
strategydescription="awesome strategy",
startTime="1980-03-23T10:20:15",
starttime="1980-03-23T10:20:15",
duration= "P6Y3M10DT15H",
observation_source="SAS",
observation_id="SAS VIC Tree Id",
......@@ -364,10 +787,10 @@ def main():
observingmode="Interferometer",
description="unspecified",
process_map=
create_process_mapping(
ProcessMap(
strategyname="strategy1",
strategydescription="awesome strategy",
startTime="1980-03-23T10:20:15",
starttime="1980-03-23T10:20:15",
duration= "P6Y3M10DT15H",
observation_source="SAS",
observation_id="SAS VIC Tree Id",
......@@ -381,8 +804,6 @@ def main():
mysip.prettyprint()
# to get list of permitted values, e.g.:
# print ltasip.StationTypeType.values()
if __name__ == '__main__':
main()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment