Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
siplib.py 46.17 KiB
#!/usr/bin/python
# This module provides functions for easy creation of a Lofar LTA SIP document.
# It builds upon a Pyxb-generated API from the schema definition, which is very clever but hard to use, since
# the arguments in class constructors and functions definitions are not verbose and there is no intuitive way
# to determine the mandatory and optional elements to create a valid SIP document. This module is designed to
# provide easy-to-use functions that bridges this shortcoming of the Pyxb API.
#
# Usage: Import module. Create an instance of Sip.
# Add elements through the Sip.add_X functions. Many require instances of other classes of the module.
# call getprettyxml() and e.g. save to disk.
#
# Note on validation: From construction through every addition, the SIP should remain valid (or throw an error
# that clearly points out where e.g. a given value does not meet the restrictions of the SIP schema.
#
# Note on code structure: This has to be seen as a compromise between elegant and maintainable code with well-
# structured inheritance close to the schema definition on the one hand, and something more straightforward to use,
# with flatter hierarchies on the other hand.
#
# Note on parameter maps: The ...Map objects are helper objects to create dictionaries for the commonly used
# constructor arguments of several other objects. This could alternatively also be implemented via inheritance from
# a supertype, and indeed is solved like this in the pyxb code. However, this then requires the use of an argument
# list pointer, which hides the list of required and optional arguments from the user. Alternatively, all arguments
# have to be mapped in all constructors repeatedly, creating lots of boilerplate code. This is the nicest approach
# I could think of that keeps the whole thing reasonably maintainable AND usable.
import ltasip
import pyxb
import constants
import os
import xml.dom.minidom
from pyxb.namespace import XMLSchema_instance as xsi
from pyxb.namespace import XMLNamespaces as xmlns
VERSION = "SIPlib 0.3"
d = os.path.dirname(os.path.realpath(__file__))
STATION_CONFIG_PATH = d+'/station_coordinates.conf'
ltasip.Namespace.setPrefix('sip')
# todo: create docstrings for everything.
# specify types and explain purpose of the field (-> ask someone with more astronomical background)
# todo: check what fields can be implicitely set
# (e.g. parameter type in dataproduct may be derived from the specific dataproduct class that is used)
# Some parameters may also be filled with a reasonable default value. Right now, usually only optional values
# as per schema definition are optional parameters.
# ===============================
# Identifier definition (used for LTA entities, i-e- processes and dataproducts):
class Identifier():
def __init__(self,
source,
id,
name=None
):
self.__pyxb_identifier=ltasip.IdentifierType(source=source, identifier=id, name=name)
def get_pyxb_identifier(self):
return self.__pyxb_identifier
# ===============================
# Station definitions:
class Station():
def __init__(self,
name,
type,
antennafield1,
antennafield2=None
):
__afields=[antennafield1.get_pyxb_antennafield()]
if antennafield2:
__afields.append(antennafield2.get_pyxb_antennafield())
self.__pyxb_station=ltasip.Station(
name=name,
stationType=type,
antennaField=__afields
)
@classmethod
def preconfigured(cls,
name,
antennafieldtypes
):
if antennafieldtypes is None or len(antennafieldtypes)>2:
raise Exception("please specify a list with one or two antennafield types for station:",name)
__afield1=None
__afield2=None
print os.path.abspath(STATION_CONFIG_PATH)
with open(STATION_CONFIG_PATH, 'r') as f:
for line in f.readlines():
if line.strip():
field_coords = eval("dict("+line+")") # literal_eval does not accept dict definition via constructor. Make sure config file is not writable to prevent code execution!
for type in antennafieldtypes:
if field_coords["name"] == name+"_"+type:
__afield=AntennafieldXYZ(
type=type,
coordinate_system=field_coords["coordinate_system"],
coordinate_unit=constants.LENGTHUNIT_M, # Does this make sense? I have to give a lenght unit accoridng to the XSD, but ICRF should be decimal degrees?!
coordinate_x=field_coords["x"],
coordinate_y=field_coords["y"],
coordinate_z=field_coords["z"])
if not __afield1:
__afield1=__afield
elif not __afield2:
__afield2=__afield
if not __afield1:
raise Exception("no matching coordinates found for station:", name,"and fields",str(antennafieldtypes))
if name.startswith( 'CS' ):
sttype = "Core"
elif name.startswith( "RS" ):
sttype = "Remote"
else:
sttype = "International"
return cls(
name=name,
type=sttype,
antennafield1=__afield1,
antennafield2=__afield2,
)
def get_pyxb_station(self):
return self.__pyxb_station
class AntennafieldXYZ():
def __init__(self,
type,
coordinate_system,
coordinate_x,
coordinate_y,
coordinate_z,
coordinate_unit):
self.__pyxb_antennafield=ltasip.AntennaField(
name=type,
location=ltasip.Coordinates(
coordinateSystem=coordinate_system,
x=ltasip.Length(coordinate_x, units=coordinate_unit),
y=ltasip.Length(coordinate_y, units=coordinate_unit),
z=ltasip.Length(coordinate_z, units=coordinate_unit)
)
)
def get_pyxb_antennafield(self):
return self.__pyxb_antennafield
class AntennafieldRadLonLat():
def __init__(self,
type,
coordinate_system,
coordinate_radius,
coordinate_radiusunit,
coordinate_longitude,
coordinate_latitude,
coordinate_lonlatunit,
):
self.__pyxb_antennafield=ltasip.AntennaField(
name=type,
location=ltasip.Coordinates(
coordinateSystem=coordinate_system,
radius=ltasip.Length(coordinate_radius, units=coordinate_radiusunit),
longitude=ltasip.Angle(coordinate_longitude, units=coordinate_lonlatunit),
latitude=ltasip.Angle(coordinate_latitude, units=coordinate_lonlatunit)
)
)
def get_pyxb_antennafield(self):
return self.__pyxb_antennafield
# ============
# PipelineRuns:
class PipelineMap():
def __init__(self,
name,
version,
sourcedata_identifiers,
process_map
):
__sourcedata=ltasip.DataSources()
for id in sourcedata_identifiers:
__sourcedata.append(id.get_pyxb_identifier())
self.pipeline_map=dict(
pipelineName=name,
pipelineVersion=version,
sourceData=__sourcedata
)
self.pipeline_map.update(process_map.get_dict())
def get_dict(self):
return self.pipeline_map
class SimplePipeline():
def __init__(self, pipeline_map):
self.__pyxb_pipeline=ltasip.PipelineRun(**pipeline_map.get_dict())
def get_pyxb_pipeline(self):
return self.__pyxb_pipeline
class ImagingPipeline():
def __init__(self,
pipeline_map,
imagerintegrationtime,
imagerintegrationtime_unit,
numberofmajorcycles,
numberofinstrumentmodels,
numberofcorrelateddataproducts,
numberofskyimages,
frequencyintegrationstep=None,
timeintegrationstep=None,
skymodeldatabase=None,
demixing=None):
self.__pyxb_pipeline=ltasip.ImagingPipeline(
frequencyIntegrationStep=frequencyintegrationstep,
timeIntegrationStep=timeintegrationstep,
skyModelDatabase=skymodeldatabase,
demixing=demixing,
imagerIntegrationTime=(ltasip.Time(imagerintegrationtime, units=imagerintegrationtime_unit)),
numberOfMajorCycles=numberofmajorcycles,
numberOfInstrumentModels=numberofinstrumentmodels,
numberOfCorrelatedDataProducts=numberofcorrelateddataproducts,
numberOfSkyImages=numberofskyimages,
**pipeline_map.get_dict()
)
def get_pyxb_pipeline(self):
return self.__pyxb_pipeline
class CalibrationPipeline():
def __init__(self,
pipeline_map,
skymodeldatabase,
numberofinstrumentmodels,
numberofcorrelateddataproducts,
frequencyintegrationstep=None,
timeintegrationstep=None,
flagautocorrelations=None,
demixing=None):
self.__pyxb_pipeline=ltasip.CalibrationPipeline(
frequencyIntegrationStep=frequencyintegrationstep,
timeIntegrationStep=timeintegrationstep,
flagAutoCorrelations=flagautocorrelations,
demixing=demixing,
skyModelDatabase=skymodeldatabase,
numberOfInstrumentModels=numberofinstrumentmodels,
numberOfCorrelatedDataProducts=numberofcorrelateddataproducts,
**pipeline_map.get_dict()
)
def get_pyxb_pipeline(self):
return self.__pyxb_pipeline
class AveragingPipeline():
def __init__(self,
pipeline_map,
numberofcorrelateddataproducts,
frequencyintegrationstep,
timeintegrationstep,
flagautocorrelations,
demixing):
self.__pyxb_pipeline=ltasip.AveragingPipeline(
frequencyIntegrationStep=frequencyintegrationstep,
timeIntegrationStep=timeintegrationstep,
flagAutoCorrelations=flagautocorrelations,
demixing=demixing,
numberOfCorrelatedDataProducts=numberofcorrelateddataproducts,
**pipeline_map.get_dict()
)
#self.__pyxb_pipeline._setAttribute("xsi:type","ns1:AveragingPipeline")
def get_pyxb_pipeline(self):
return self.__pyxb_pipeline
class PulsarPipeline():
def __init__(self,
pipeline_map,
pulsarselection,
pulsars,
dosinglepulseanalysis,
convertRawTo8bit,
subintegrationlength,
subintegrationlength_unit,
skiprfiexcision,
skipdatafolding,
skipoptimizepulsarprofile,
skipconvertrawintofoldedpsrfits,
runrotationalradiotransientsanalysis,
skipdynamicspectrum,
skipprefold):
self.__pyxb_pipeline=ltasip.PulsarPipeline(
pulsarSelection=pulsarselection,
pulsars=pulsars,
doSinglePulseAnalysis=dosinglepulseanalysis,
convertRawTo8bit=convertRawTo8bit,
subintegrationLength=ltasip.Time(subintegrationlength , units=subintegrationlength_unit),
skipRFIExcision=skiprfiexcision,
skipDataFolding=skipdatafolding,
skipOptimizePulsarProfile=skipoptimizepulsarprofile,
skipConvertRawIntoFoldedPSRFITS=skipconvertrawintofoldedpsrfits,
runRotationalRAdioTransientsAnalysis=runrotationalradiotransientsanalysis,
skipDynamicSpectrum=skipdynamicspectrum,
skipPreFold=skipprefold,
**pipeline_map.get_dict()
)
def get_pyxb_pipeline(self):
return self.__pyxb_pipeline
class CosmicRayPipeline():
def __init__(self, pipeline_map):
self.__pyxb_pipeline=ltasip.CosmicRayPipeline(**pipeline_map.get_dict())
def get_pyxb_pipeline(self):
return self.__pyxb_pipeline
class LongBaselinePipeline():
def __init__(self,
pipeline_map,
subbandspersubbandgroup,
subbandgroupspermS
):
self.__pyxb_pipeline=ltasip.LongBaselinePipeline(
subbandsPerSubbandGroup=subbandspersubbandgroup,
subbandGroupsPerMS=subbandgroupspermS,
**pipeline_map.get_dict())
def get_pyxb_pipeline(self):
return self.__pyxb_pipeline
class GenericPipeline():
def __init__(self, pipeline_map):
self.__pyxb_pipeline=ltasip.GenericPipeline(**pipeline_map.get_dict())
def get_pyxb_pipeline(self):
return self.__pyxb_pipeline
# ==========
# Dataproducts:
class DataProductMap():
def __init__(self,
type,
identifier,
size,
filename,
fileformat,
process_identifier,
checksum_md5=None,
checksum_adler32=None,
storageticket=None
):
self.dataproduct_map= dict(dataProductType=type,
dataProductIdentifier=identifier.get_pyxb_identifier(),
size=size,
fileName=filename, fileFormat=fileformat,
processIdentifier=process_identifier.get_pyxb_identifier(),
storageTicket=storageticket)
checksums = []
if checksum_md5 is not None:
checksums = checksums + [ltasip.ChecksumType(algorithm="MD5", value_=checksum_md5)]
if checksum_adler32 is not None:
checksums = checksums + [ltasip.ChecksumType(algorithm="Adler32", value_=checksum_adler32)]
self.dataproduct_map["checksum"]=checksums
def get_dict(self):
return self.dataproduct_map
class __DataProduct(object):
def __init__(self, pyxb_dataproduct):
self.__pyxb_dataproduct=pyxb_dataproduct
def set_identifier(self, identifier):
self.__pyxb_dataproduct.dataProductIdentifier = identifier.get_pyxb_identifier()
def set_process_identifier(self, identifier):
self.__pyxb_dataproduct.processIdentifier = identifier.get_pyxb_identifier()
def set_subarraypointing_identifier(self, identifier):
self.__pyxb_dataproduct.subArrayPointingIdentifier = identifier.get_pyxb_identifier()
def get_pyxb_dataproduct(self):
return self.__pyxb_dataproduct
class SimpleDataProduct(__DataProduct):
def __init__(self, dataproduct_map):
super(SimpleDataProduct, self).__init__(
ltasip.DataProduct(**dataproduct_map.get_dict())
)
class SkyModelDataProduct(__DataProduct):
def __init__(self, dataproduct_map):
super(SkyModelDataProduct, self).__init__(
ltasip.SkyModelDataProduct(**dataproduct_map.get_dict())
)
class GenericDataProduct(__DataProduct):
def __init__(self, dataproduct_map):
super(GenericDataProduct, self).__init__(
ltasip.GenericDataProduct(**dataproduct_map.get_dict())
)
class UnspecifiedDataProduct(__DataProduct):
def __init__(self, dataproduct_map):
super(UnspecifiedDataProduct, self).__init__(
ltasip.UnspecifiedDataProduct(**dataproduct_map.get_dict())
)
class PixelMapDataProduct(__DataProduct):
def __init__(self,
dataproduct_map,
numberofaxes,
coordinates):
super(PixelMapDataProduct, self).__init__(
ltasip.PixelMapDataProduct(
numberOfAxes=numberofaxes,
numberOfCoordinates=len(coordinates),
coordinate = [x.get_pyxb_coordinate() for x in coordinates],
**dataproduct_map.get_dict())
)
class SkyImageDataProduct(__DataProduct):
def __init__(self,
dataproduct_map,
numberofaxes,
coordinates,
locationframe,
timeframe,
observationpointing,
restoringbeammajor_angle,
restoringbeammajor_angleunit,
restoringbeamminor_angle,
restoringbeamminor_angleunit,
rmsnoise):
super(SkyImageDataProduct, self).__init__(
ltasip.SkyImageDataProduct(
numberOfAxes=numberofaxes,
numberOfCoordinates=len(coordinates),
coordinate = [x.get_pyxb_coordinate() for x in coordinates],
locationFrame=locationframe,
timeFrame=timeframe,
observationPointing=observationpointing.get_pyxb_pointing(),
restoringBeamMajor=ltasip.Angle(restoringbeammajor_angle, units=restoringbeammajor_angleunit),
restoringBeamMinor=ltasip.Angle(restoringbeamminor_angle, units=restoringbeamminor_angleunit),
rmsNoise=ltasip.Pixel(rmsnoise, units="Jy/beam"),
**dataproduct_map.get_dict())
)
class CorrelatedDataProduct(__DataProduct):
def __init__(self,
dataproduct_map,
subarraypointing_identifier,
subband,
starttime,
duration,
integrationinterval,
integrationintervalunit,
central_frequency,
central_frequencyunit,
channelwidth_frequency,
channelwidth_frequencyunit,
channelspersubband,
stationsubband=None):
super(CorrelatedDataProduct, self).__init__(
ltasip.CorrelatedDataProduct(
subArrayPointingIdentifier=subarraypointing_identifier.get_pyxb_identifier(),
centralFrequency=ltasip.Frequency(central_frequency, units=central_frequencyunit),
channelWidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit),
subband=subband,
startTime=starttime,
duration=duration,
integrationInterval=ltasip.Time(integrationinterval,units=integrationintervalunit),
channelsPerSubband=channelspersubband,
stationSubband=stationsubband,
**dataproduct_map.get_dict()
)
)
class InstrumentModelDataProduct(__DataProduct):
def __init__(self, dataproduct_map):
super(InstrumentModelDataProduct, self).__init__(
ltasip.InstrumentModelDataProduct(**dataproduct_map.get_dict())
)
class TransientBufferBoardDataProduct(__DataProduct):
def __init__(self,
dataproduct_map,
numberofsamples,
timestamp,
triggertype,
triggervalue):
super(TransientBufferBoardDataProduct, self).__init__(
ltasip.TransientBufferBoardDataProduct(
numberOfSamples=numberofsamples,
timeStamp=timestamp,
triggerParameters=ltasip.TBBTrigger(type=triggertype,value=triggervalue),
**dataproduct_map.get_dict())
)
class PulpSummaryDataProduct(__DataProduct):
def __init__(self,
dataproduct_map,
filecontent,
datatype):
super(PulpSummaryDataProduct, self).__init__(
ltasip.PulpSummaryDataProduct(
fileContent=filecontent,
dataType=datatype,
**dataproduct_map.get_dict())
)
class PulpDataProduct(__DataProduct):
def __init__(self,
dataproduct_map,
filecontent,
datatype,
arraybeam):
super(PulpDataProduct, self).__init__(
ltasip.PulpDataProduct(
fileContent=filecontent,
dataType=datatype,
arrayBeam=arraybeam.get_pyxb_beam(),
**dataproduct_map.get_dict())
)
class BeamFormedDataProduct(__DataProduct):
def __init__(self,
dataproduct_map,
#numberofbeams,
beams=None):
__beams=None
__nbeams=0
if beams:
__beams=ltasip.ArrayBeams()
__nbeams = len(beams)
for beam in beams:
__beams.append(beam.get_pyxb_beam())
super(BeamFormedDataProduct, self).__init__(
ltasip.BeamFormedDataProduct(
numberOfBeams=__nbeams,
beams=__beams,
**dataproduct_map.get_dict())
)
# ============
# Coordinates:
class SpectralCoordinate():
def __init__(self,
quantity_type,
quantity_value,
axis,
):
args = dict(spectralQuantity=ltasip.SpectralQuantity(value_=quantity_value, type=quantity_type))
if isinstance(axis, LinearAxis):
args.update(dict(spectralLinearAxis=axis.get_pyxb_axis()))
elif isinstance(axis, TabularAxis):
args.update(dict(spectralTabularAxis=axis.get_pyxb_axis()))
else:
print "wrong axis type:",type(axis)
self.__pyxb_coordinate=ltasip.SpectralCoordinate(**args)
def get_pyxb_coordinate(self):
return self.__pyxb_coordinate
class TimeCoordinate():
def __init__(self,
equinox,
axis,
):
args = dict(equinox=equinox)
if isinstance(axis, LinearAxis):
args.update(dict(timeLinearAxis=axis.get_pyxb_axis()))
elif isinstance(axis, TabularAxis):
args.update(dict(timeTabularAxis=axis.get_pyxb_axis()))
else:
print "wrong axis type:",type(axis)
self.__pyxb_coordinate=ltasip.TimeCoordinate(**args)
def get_pyxb_coordinate(self):
return self.__pyxb_coordinate
class PolarizationCoordinate():
def __init__(self,
tabularaxis,
polarizations
):
self.__pyxb_coordinate=ltasip.PolarizationCoordinate(
polarizationTabularAxis=tabularaxis.get_pyxb_axis(),
polarization=polarizations)
def get_pyxb_coordinate(self):
return self.__pyxb_coordinate
class DirectionCoordinate():
def __init__(self,
linearaxis_a,
linearaxis_b,
pc0_0,
pc0_1,
pc1_0,
pc1_1,
equinox,
radecsystem,
projection,
projectionparameters,
longitudepole_angle,
longitudepole_angleunit,
latitudepole_angle,
latitudepole_angleunit):
self.__pyxb_coordinate=ltasip.DirectionCoordinate(
directionLinearAxis=[linearaxis_a.get_pyxb_axis(), linearaxis_b.get_pyxb_axis()],
PC0_0=pc0_0,
PC0_1=pc0_1,
PC1_0=pc1_0,
PC1_1=pc1_1,
equinox=equinox,
raDecSystem=radecsystem,
projection=projection,
projectionParameters=projectionparameters,
longitudePole=ltasip.Angle(longitudepole_angle, units=longitudepole_angleunit),
latitudePole=ltasip.Angle(latitudepole_angle, units=latitudepole_angleunit)
)
def get_pyxb_coordinate(self):
return self.__pyxb_coordinate
# ###########
# ArrayBeams:
class ArrayBeamMap():
def __init__(self,
subarraypointing_identifier,
beamnumber,
dispersionmeasure,
numberofsubbands,
stationsubbands,
samplingtime,
samplingtimeunit,
centralfrequencies,
centralfrequencies_unit,
channelwidth_frequency,
channelwidth_frequencyunit,
channelspersubband,
stokes):
self.arraybeam_map=dict(
subArrayPointingIdentifier=subarraypointing_identifier.get_pyxb_identifier(),
beamNumber=beamnumber,
dispersionMeasure=dispersionmeasure,
numberOfSubbands=numberofsubbands,
stationSubbands=stationsubbands,
samplingTime=ltasip.Time(samplingtime, units=samplingtimeunit),
centralFrequencies=ltasip.ListOfFrequencies(frequencies=centralfrequencies, unit=centralfrequencies_unit),
channelWidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit),
channelsPerSubband=channelspersubband,
stokes=stokes
)
def get_dict(self):
return self.arraybeam_map
class SimpleArrayBeam():
def __init__(self, arraybeam_map):
self.__pyxb_beam=ltasip.ArrayBeam(**arraybeam_map.get_dict())
def get_pyxb_beam(self):
return self.__pyxb_beam
class CoherentStokesBeam():
def __init__(self,
arraybeam_map,
pointing,
offset):
self.__pyxb_beam=ltasip.CoherentStokesBeam(
pointing=pointing,
offset=offset,
**arraybeam_map.get_dict())
def get_pyxb_beam(self):
return self.__pyxb_beam
class IncoherentStokesBeam():
def __init__(self, arraybeam_map):
self.__pyxb_beam=ltasip.IncoherentStokesBeam(**arraybeam_map.get_dict())
def get_pyxb_beam(self):
return self.__pyxb_beam
class FlysEyeBeam():
def __init__(self,
arraybeam_map,
station):
self.__pyxb_beam=ltasip.FlysEyeBeam(
station=station.get_pyxb_station(),
**arraybeam_map.get_dict())
def get_pyxb_beam(self):
return self.__pyxb_beam
# ###################
# Online processings:
class CorrelatorProcessing():
def __init__(self,
integrationinterval,
integrationinterval_unit,
channelwidth_frequency=None,
channelwidth_frequencyunit=None,
channelspersubband=None,
processingtype="Correlator",
):
__channelwidth=None
if channelwidth_frequency and channelwidth_frequencyunit:
__channelwidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit),
self.__pyxb_rtprocessing=ltasip.Correlator(
integrationInterval=ltasip.Time(integrationinterval,units=integrationinterval_unit),
processingType=processingtype,
# channelWidth=__channelwidth,
channelsPerSubband=channelspersubband
)
# Somehow this does not work in the constructor:
self.__pyxb_rtprocessing.channelwidth=__channelwidth
def get_pyxb_rtprocessing(self):
return self.__pyxb_rtprocessing
class CoherentStokesProcessing():
def __init__(self,
rawsamplingtime,
rawsamplingtime_unit,
timesamplingdownfactor,
samplingtime,
samplingtime_unit,
stokes,
numberofstations,
stations,
frequencydownsamplingfactor=None,
numberofcollapsedchannels=None,
channelwidth_frequency=None,
channelwidth_frequencyunit=None,
channelspersubband=None,
processingtype="Coherent Stokes",
):
__stations = ltasip.Stations()
for station in stations:
__stations.append(station.get_pyxb_station())
__channelwidth=None
if channelwidth_frequency and channelwidth_frequencyunit:
__channelwidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit)
self.__pyxb_rtprocessing=ltasip.CoherentStokes(
rawSamplingTime=ltasip.Time(rawsamplingtime, units=rawsamplingtime_unit),
timeDownsamplingFactor=timesamplingdownfactor,
samplingTime=ltasip.Time(samplingtime, units=samplingtime_unit),
frequencyDownsamplingFactor=frequencydownsamplingfactor,
numberOfCollapsedChannels=numberofcollapsedchannels,
stokes=stokes,
numberOfStations=numberofstations,
stations=__stations,
#channelWidth=__channelwidth,
channelsPerSubband=channelspersubband,
processingType=processingtype
)
# Somehow this does not work in the constructor:
self.__pyxb_rtprocessing.channelwidth=__channelwidth
def get_pyxb_rtprocessing(self):
return self.__pyxb_rtprocessing
# This is identical to coherent stokes. Redundancy already in the SIP schema...
class IncoherentStokesProcessing():
def __init__(self,
rawsamplingtime,
rawsamplingtime_unit,
timesamplingdownfactor,
samplingtime,
samplingtime_unit,
stokes,
numberofstations,
stations,
frequencydownsamplingfactor=None,
numberofcollapsedchannels=None,
channelwidth_frequency=None,
channelwidth_frequencyunit=None,
channelspersubband=None,
processingtype="Incoherent Stokes",
):
__stations = ltasip.Stations()
for station in stations:
__stations.append(station.get_pyxb_station())
__channelwidth=None
if channelwidth_frequency and channelwidth_frequencyunit:
__channelwidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit),
self.__pyxb_rtprocessing=ltasip.IncoherentStokes(
rawSamplingTime=ltasip.Time(rawsamplingtime, units=rawsamplingtime_unit),
timeDownsamplingFactor=timesamplingdownfactor,
samplingTime=ltasip.Time(samplingtime, units=samplingtime_unit),
frequencyDownsamplingFactor=frequencydownsamplingfactor,
numberOfCollapsedChannels=numberofcollapsedchannels,
stokes=stokes,
numberOfStations=numberofstations,
stations=__stations,
#channelWidth=__channelwidth,
channelsPerSubband=channelspersubband,
processingType=processingtype
)
# Somehow this does not work in the constructor:
self.__pyxb_rtprocessing.channelwidth=__channelwidth
def get_pyxb_rtprocessing(self):
return self.__pyxb_rtprocessing
class FlysEyeProcessing():
def __init__(self,
rawsamplingtime,
rawsamplingtime_unit,
timesamplingdownfactor,
samplingtime,
samplingtime_unit,
stokes,
channelwidth_frequency=None,
channelwidth_frequencyunit=None,
channelspersubband=None,
processingtype="Fly's Eye",
):
__channelwidth=None
if channelwidth_frequency and channelwidth_frequencyunit:
__channelwidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit)
self.__pyxb_rtprocessing=ltasip.FlysEye(
rawSamplingTime=ltasip.Time(rawsamplingtime, units=rawsamplingtime_unit),
timeDownsamplingFactor=timesamplingdownfactor,
samplingTime=ltasip.Time(samplingtime, units=samplingtime_unit),
stokes=stokes,
# channelWidth=__channelwidth,
channelsPerSubband=channelspersubband,
processingType=processingtype)
# Somehow this does not work in the constructor:
self.__pyxb_rtprocessing.channelwidth=__channelwidth
def get_pyxb_rtprocessing(self):
return self.__pyxb_rtprocessing
class NonStandardProcessing():
def __init__(self,
channelwidth_frequency,
channelwidth_frequencyunit,
channelspersubband,
processingtype="Non Standard"):
self.__pyxb_rtprocessing=ltasip.NonStandard(
channelsPerSubband=channelspersubband,
processingType=processingtype,
channelWidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit)
)
def get_pyxb_rtprocessing(self):
return self.__pyxb_rtprocessing
# ==============
# Processes:
class ProcessMap():
def __init__(self,
strategyname,
strategydescription,
starttime,
duration,
identifier,
observation_identifier,
relations,
parset_identifier=None
):
__relations=ltasip.ProcessRelations()
for rel in relations:
__relations.append(rel.get_pyxb_processrelation())
self.process_map = dict(processIdentifier=identifier.get_pyxb_identifier(),
observationId=observation_identifier.get_pyxb_identifier(),
relations=__relations,
strategyName=strategyname, strategyDescription=strategydescription, startTime=starttime,
duration=duration)
if parset_identifier:
self.process_map["parset"]=parset_identifier.get_pyxb_identifier()
def get_dict(self):
return self.process_map
class ProcessRelation():
def __init__(self,
identifier,
type="GroupID"
):
self.__pyxb_processrelation=ltasip.ProcessRelation(
relationType=ltasip.ProcessRelationType(type),
identifier=identifier.get_pyxb_identifier()
)
def get_pyxb_processrelation(self):
return self.__pyxb_processrelation
class SimpleProcess():
def __init__(self, process_map):
self.pyxb_process = ltasip.Process(**process_map.get_dict())
def get_pyxb_process(self):
return self.pyxb_process
# ########
# Others:
class PointingRaDec():
def __init__(self,
ra_angle,
ra_angleunit,
dec_angle,
dec_angleunit,
equinox):
self.__pyxb_pointing=ltasip.Pointing(
rightAscension=ltasip.Angle(ra_angle, units=ra_angleunit),
declination=ltasip.Angle(dec_angle, units=dec_angleunit),
equinox=equinox)
def get_pyxb_pointing(self):
return self.__pyxb_pointing
class PointingAltAz():
def __init__(self,
az_angle,
az_angleunit,
alt_angle,
alt_angleunit,
equinox):
self.__pyxb_pointing=ltasip.Pointing(
azimuth=ltasip.Angle(az_angle, units=az_angleunit),
altitude=ltasip.Angle(alt_angle, units=alt_angleunit),
equinox=equinox)
def get_pyxb_pointing(self):
return self.__pyxb_pointing
class LinearAxis():
def __init__(self,
number,
name,
units,
length,
increment,
referencepixel,
referencevalue):
self.__pyxb_axis= ltasip.LinearAxis(
number=number,
name=name,
units=units,
length=length,
increment=increment,
referencePixel=referencepixel,
referenceValue=referencevalue
)
def get_pyxb_axis(self):
return self.__pyxb_axis
class TabularAxis():
def __init__(self,
number,
name,
units,
length
):
self.__pyxb_axis=ltasip.TabularAxis(
number=number,
name=name,
units=units,
length=length,
)
def get_pyxb_axis(self):
return self.__pyxb_axis
class SubArrayPointing():
def __init__(self,
pointing,
beamnumber,
identifier,
measurementtype,
targetname,
starttime,
duration,
numberofprocessing,
numberofcorrelateddataproducts,
numberofbeamformeddataproducts,
relations,
correlatorprocessing=None,
coherentstokesprocessing=None,
incoherentstokesprocessing=None,
flyseyeprocessing=None,
nonstandardprocessing=None,
measurementdescription=None):
__relations=ltasip.ProcessRelations()
for rel in relations:
__relations.append(rel.get_pyxb_processrelation())
__processing=None
for processing in [correlatorprocessing, coherentstokesprocessing, incoherentstokesprocessing, flyseyeprocessing, nonstandardprocessing]:
if processing:
if __processing is None:
__processing=ltasip.Processing()
__processing.append(processing.get_pyxb_rtprocessing()
)
self.__pyxb_subarraypointing = ltasip.SubArrayPointing(
pointing=pointing.get_pyxb_pointing(),
beamNumber=beamnumber,
subArrayPointingIdentifier=identifier.get_pyxb_identifier(),
measurementType=measurementtype,
targetName=targetname,
startTime=starttime,
duration=duration,
numberOfProcessing=numberofprocessing,
numberOfCorrelatedDataProducts=numberofcorrelateddataproducts,
numberOfBeamFormedDataProducts=numberofbeamformeddataproducts,
processing=__processing,
relations=__relations,
measurementDescription=measurementdescription)
def get_pyxb_subarraypointing(self):
return self.__pyxb_subarraypointing
class Observation():
def __init__(self,
observingmode,
instrumentfilter,
clock_frequency,
clock_frequencyunit,
stationselection,
antennaset,
timesystem,
numberofstations,
stations,
numberofsubarraypointings,
numberoftbbevents,
numberofcorrelateddataproducts,
numberofbeamformeddataproducts,
numberofbitspersample,
process_map,
channelwidth_frequency=None,
channelwidth_frequencyunit=None,
observationdescription=None,
channelspersubband=None,
subarraypointings=None,
transientbufferboardevents=None
):
__stations = ltasip.Stations()
for station in stations:
__stations.append(station.get_pyxb_station())
__tbbevents=None
if(transientbufferboardevents):
__tbbevents = ltasip.TransientBufferBoardEvents()
for source in transientbufferboardevents:
__tbbevents.append(ltasip.TransientBufferBoardEvent(eventSource=source))
__pointings=None
if(subarraypointings):
__pointings = ltasip.SubArrayPointings()
for point in subarraypointings:
__pointings.append(point.get_pyxb_subarraypointing())
self.__pyxb_observation = ltasip.Observation(
observingMode=observingmode,
instrumentFilter=instrumentfilter,
clock=ltasip.ClockType(str(clock_frequency), units=clock_frequencyunit),
stationSelection=stationselection,
antennaSet=antennaset,
timeSystem=timesystem,
numberOfStations=numberofstations,
stations=__stations,
numberOfSubArrayPointings=numberofsubarraypointings,
numberOftransientBufferBoardEvents=numberoftbbevents,
numberOfCorrelatedDataProducts=numberofcorrelateddataproducts,
numberOfBeamFormedDataProducts=numberofbeamformeddataproducts,
numberOfBitsPerSample=numberofbitspersample,
observationDescription=observationdescription,
#channelWidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit),
channelsPerSubband=channelspersubband,
subArrayPointings=__pointings,
transientBufferBoardEvents=__tbbevents,
**process_map.get_dict()
)
# Somehow this does not work in the constructor:
if channelwidth_frequency and channelwidth_frequencyunit:
self.__pyxb_observation.channelwidth=ltasip.Frequency(channelwidth_frequency, units=channelwidth_frequencyunit),
def get_pyxb_observation(self):
return self.__pyxb_observation
# #############################################################################################
# ============
# SIP document
class Sip(object):
print "\n################"
print VERSION
print "################\n"
#-----------
# Base document
#---
def __init__(self,
project_code,
project_primaryinvestigator,
project_contactauthor,
#project_telescope,
project_description,
dataproduct,
project_coinvestigators=None,
):
self.sip = ltasip.ltaSip()
self.sip.sipGeneratorVersion = VERSION
self.sip.project = ltasip.Project(
projectCode=project_code,
primaryInvestigator=project_primaryinvestigator,
contactAuthor=project_contactauthor,
telescope="LOFAR",#project_telescope,
projectDescription=project_description,
coInvestigator=project_coinvestigators,
)
self.sip.dataProduct=dataproduct.get_pyxb_dataproduct()
self.get_prettyxml() # for validation
@classmethod
def from_xml(cls, xml):
newsip = Sip.__new__(Sip)
newsip.sip = ltasip.CreateFromDocument(xml)
newsip.get_prettyxml() # for validation
return newsip
#-------------
# Optional additions
#---
def add_related_dataproduct(self, dataproduct):
self.sip.relatedDataProduct.append(dataproduct.get_pyxb_dataproduct())
return self.get_prettyxml()
def add_observation(self, observation):
self.sip.observation.append(observation.get_pyxb_observation())
return self.get_prettyxml()
def add_pipelinerun(self, pipeline):
self.sip.pipelineRun.append(pipeline.get_pyxb_pipeline())
return self.get_prettyxml()
def add_unspecifiedprocess(self,
observingmode,
description,
process_map,
):
up = ltasip.UnspecifiedProcess(
observingMode=observingmode,
description=description,
**process_map.get_dict()
)
self.sip.unspecifiedProcess.append(up)
return self.get_prettyxml()
def add_parset(self,
identifier,
contents):
self.sip.parset.append(ltasip.Parset(
identifier=identifier.get_pyxb_identifier(),
contents=contents
))
return self.get_prettyxml()
def add_related_dataproduct_with_history(self, relateddataproduct_sip):
# add the dataproduct described by the SIP (if not there)
if not any(x.dataProductIdentifier.identifier == relateddataproduct_sip.sip.dataProduct.dataProductIdentifier.identifier for x in self.sip.relatedDataProduct):
self.sip.relatedDataProduct.append(relateddataproduct_sip.sip.dataProduct)
else:
print "WARNING: There already exists a dataproduct with id", relateddataproduct_sip.sip.dataProduct.dataProductIdentifier.identifier," - Will try to add any new related items anyway."
if relateddataproduct_sip.sip.relatedDataProduct:
# add related dataproducts (if not there already)
for dp in relateddataproduct_sip.sip.relatedDataProduct:
if not any(x.dataProductIdentifier.identifier == dp.dataProductIdentifier.identifier for x in self.sip.relatedDataProduct):
self.sip.relatedDataProduct.append(dp)
if relateddataproduct_sip.sip.observation:
# add related dataproducts (if not there already)
for obs in relateddataproduct_sip.sip.observation:
if not any(x.processIdentifier.identifier == obs.processIdentifier.identifier for x in self.sip.observation):
self.sip.observation.append(obs)
if relateddataproduct_sip.sip.pipelineRun:
# add related pipelineruns (if not there already)
for pipe in relateddataproduct_sip.sip.pipelineRun:
if not any(x.processIdentifier.identifier == pipe.processIdentifier.identifier for x in self.sip.pipelineRun):
self.sip.pipelineRun.append(pipe)
if relateddataproduct_sip.sip.unspecifiedProcess:
# add related unspecified processes (if not there already)
for unspec in relateddataproduct_sip.sip.unspecifiedProcess:
if not any(x.processIdentifier.identifier == unspec.processIdentifier.identifier for x in self.sip.unspecifiedProcess):
self.sip.unspecifiedProcess.append(unspec)
if relateddataproduct_sip.sip.parset:
# add related parsets (if not there already)
for par in relateddataproduct_sip.sip.parset:
if not any(x.identifier.identifier == par.identifier.identifier for x in self.sip.parset):
self.sip.parset.append(par)
return self.get_prettyxml()
# this will also validate the document so far
def get_prettyxml(self):
try:
dom = self.sip.toDOM()
#dom.documentElement.setAttributeNS(xsi.uri(), 'xsi:schemaLocation', "http://www.astron.nl/SIP-Lofar LTA-SIP-2.6.1.xsd")
return dom.toprettyxml()
except pyxb.ValidationError as err:
#print str(err)
print err.details()
raise err
def prettyprint(self):
print self.get_prettyxml()
def save_to_file(self, path):
path = os.path.expanduser(path)
with open(path, 'w+') as f:
f.write(self.get_prettyxml())