Skip to content
Snippets Groups Projects
Commit 03108ca7 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-761: Check if the types of complexTypes like CorrelatedDataproduct,...

TMSS-761: Check if the types of complexTypes like CorrelatedDataproduct, Observation, etc are annotated using xml attributes. The LTA catalogue code expects these attributes like xsi:type="sip:CorrelatedDataProduct"
parent 76bdb368
No related branches found
No related tags found
No related merge requests found
...@@ -34,6 +34,9 @@ from lofar.lta.sip import siplib, validator, constants, ltasip ...@@ -34,6 +34,9 @@ from lofar.lta.sip import siplib, validator, constants, ltasip
import os import os
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lxml import etree
# d = os.path.dirname(os.path.realpath(__file__)) # d = os.path.dirname(os.path.realpath(__file__))
TMPFILE_PATH = "test_siplib.xml" TMPFILE_PATH = "test_siplib.xml"
...@@ -69,23 +72,124 @@ def create_basicdoc(): ...@@ -69,23 +72,124 @@ def create_basicdoc():
# project_telescope="LOFAR", # project_telescope="LOFAR",
project_description = "awesome project", project_description = "awesome project",
project_coinvestigators = ["sidekick1", "sidekick2"], project_coinvestigators = ["sidekick1", "sidekick2"],
dataproduct = siplib.SimpleDataProduct( dataproduct = create_correlated_dataproduct() )
siplib.DataProductMap(
type = "Unknown",
identifier = dp_id, def create_observation() -> siplib.Observation:
size = 1024, return siplib.Observation(observingmode="Interferometer",
filename = "/home/paulus/test.h5", instrumentfilter="10-70 MHz",
fileformat = "HDF5", clock_frequency='160',
storage_writer="Unknown", clock_frequencyunit="MHz",
storage_writer_version="Unknown", stationselection="Core",
process_identifier = pipe_id, antennaset="HBA Zero",
checksum_md5 = "hash1", timesystem="UTC",
checksum_adler32 = "hash2", stations=[siplib.Station.preconfigured("RS106", ["LBA"]),
storageticket = "ticket" siplib.Station.preconfigured("DE609", ["HBA"])],
numberofstations=5,
numberofsubarraypointings=5,
numberoftbbevents=5,
numberofcorrelateddataproducts=5,
numberofbeamformeddataproducts=5,
numberofbitspersample=5,
process_map=create_processmap(),
observationdescription="description",
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz",
channelspersubband=5,
subarraypointings=[siplib.SubArrayPointing(
pointing=siplib.PointingAltAz(
az_angle=20,
az_angleunit="degrees",
alt_angle=30,
alt_angleunit="degrees",
equinox="SUN"
),
beamnumber=5,
identifier=point_id,
measurementtype="All Sky",
targetname="Sun",
starttime="1980-03-23T10:20:15",
duration="P6Y3M10DT15H",
numberofprocessing=1,
numberofcorrelateddataproducts=2,
numberofbeamformeddataproducts=1,
relations=[siplib.ProcessRelation(
identifier=obs_id
)],
correlatorprocessing=siplib.CorrelatorProcessing(
integrationinterval=0.5,
integrationinterval_unit="ns",
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz"
),
coherentstokesprocessing=siplib.CoherentStokesProcessing(
rawsamplingtime=20,
rawsamplingtime_unit="ns",
timesamplingdownfactor=2,
samplingtime=10,
samplingtime_unit="ns",
stokes=["XX"],
numberofstations=1,
stations=[
siplib.Station.preconfigured("CS002", ["HBA0", "HBA1"])],
frequencydownsamplingfactor=2,
numberofcollapsedchannels=2,
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz",
channelspersubband=122
),
incoherentstokesprocessing=siplib.IncoherentStokesProcessing(
rawsamplingtime=20,
rawsamplingtime_unit="ns",
timesamplingdownfactor=2,
samplingtime=10,
samplingtime_unit="ns",
stokes=["XX"],
numberofstations=1,
stations=[
siplib.Station.preconfigured("CS003", ["HBA0", "HBA1"])],
frequencydownsamplingfactor=2,
numberofcollapsedchannels=2,
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz",
channelspersubband=122
),
flyseyeprocessing=siplib.FlysEyeProcessing(
rawsamplingtime=10,
rawsamplingtime_unit="ms",
timesamplingdownfactor=2,
samplingtime=2,
samplingtime_unit="ms",
stokes=["I"],
),
nonstandardprocessing=siplib.NonStandardProcessing(
channelwidth_frequency=160,
channelwidth_frequencyunit="MHz",
channelspersubband=122
) )
)],
transientbufferboardevents=["event1", "event2"]
) )
def create_correlated_dataproduct() -> siplib.CorrelatedDataProduct:
return siplib.CorrelatedDataProduct(
create_dataproductmap(),
subarraypointing_identifier=sap_id,
subband="1",
starttime="1980-03-23T10:20:15",
duration="P6Y3M10DT15H",
integrationinterval=10,
integrationintervalunit="ms",
central_frequency=160,
central_frequencyunit="MHz",
channelwidth_frequency=200,
channelwidth_frequencyunit="MHz",
channelspersubband=122,
stationsubband=2,
) )
def create_processmap(): def create_processmap():
return siplib.ProcessMap( return siplib.ProcessMap(
strategyname = "strategy1", strategyname = "strategy1",
...@@ -299,23 +403,7 @@ class TestSIPlib(unittest.TestCase): ...@@ -299,23 +403,7 @@ class TestSIPlib(unittest.TestCase):
# add optional dataproduct item # add optional dataproduct item
logger.info("===\nAdded related correlated dataproduct:\n") logger.info("===\nAdded related correlated dataproduct:\n")
logger.info(mysip.add_related_dataproduct( logger.info(mysip.add_related_dataproduct(create_correlated_dataproduct()))
siplib.CorrelatedDataProduct(
create_dataproductmap(),
subarraypointing_identifier = sap_id,
subband = "1",
starttime = "1980-03-23T10:20:15",
duration = "P6Y3M10DT15H",
integrationinterval = 10,
integrationintervalunit = "ms",
central_frequency = 160,
central_frequencyunit = "MHz",
channelwidth_frequency = 200,
channelwidth_frequencyunit = "MHz",
channelspersubband = 122,
stationsubband = 2,
)
))
# add optional dataproduct item # add optional dataproduct item
logger.info("===\nAdding related pixelmap dataproduct:\n") logger.info("===\nAdding related pixelmap dataproduct:\n")
...@@ -414,102 +502,13 @@ class TestSIPlib(unittest.TestCase): ...@@ -414,102 +502,13 @@ class TestSIPlib(unittest.TestCase):
mysip.save_to_file(TMPFILE_PATH) mysip.save_to_file(TMPFILE_PATH)
self.assertTrue(validator.validate(TMPFILE_PATH)) self.assertTrue(validator.validate(TMPFILE_PATH))
def test_observation(self): def test_observation(self):
mysip = create_basicdoc() mysip = create_basicdoc()
# add optional observation item # add optional observation item
logger.info("===\nAdding observation:\n") logger.info("===\nAdding observation:\n")
logger.info(mysip.add_observation(siplib.Observation(observingmode = "Interferometer", mysip.add_observation(create_observation())
instrumentfilter = "10-70 MHz", logger.info(mysip)
clock_frequency = '160',
clock_frequencyunit = "MHz",
stationselection = "Core",
antennaset = "HBA Zero",
timesystem = "UTC",
stations = [siplib.Station.preconfigured("RS106", ["LBA"]),
siplib.Station.preconfigured("DE609", ["HBA"])],
numberofstations = 5,
numberofsubarraypointings = 5,
numberoftbbevents = 5,
numberofcorrelateddataproducts = 5,
numberofbeamformeddataproducts = 5,
numberofbitspersample = 5,
process_map = create_processmap(),
observationdescription = "description",
channelwidth_frequency = 160,
channelwidth_frequencyunit = "MHz",
channelspersubband = 5,
subarraypointings = [siplib.SubArrayPointing(
pointing = siplib.PointingAltAz(
az_angle = 20,
az_angleunit = "degrees",
alt_angle = 30,
alt_angleunit = "degrees",
equinox = "SUN"
),
beamnumber = 5,
identifier = point_id,
measurementtype = "All Sky",
targetname = "Sun",
starttime = "1980-03-23T10:20:15",
duration = "P6Y3M10DT15H",
numberofprocessing = 1,
numberofcorrelateddataproducts = 2,
numberofbeamformeddataproducts = 1,
relations = [siplib.ProcessRelation(
identifier = obs_id
)],
correlatorprocessing = siplib.CorrelatorProcessing(
integrationinterval = 0.5,
integrationinterval_unit = "ns",
channelwidth_frequency = 160,
channelwidth_frequencyunit = "MHz"
),
coherentstokesprocessing = siplib.CoherentStokesProcessing(
rawsamplingtime = 20,
rawsamplingtime_unit = "ns",
timesamplingdownfactor = 2,
samplingtime = 10,
samplingtime_unit = "ns",
stokes = ["XX"],
numberofstations = 1,
stations = [siplib.Station.preconfigured("CS002", ["HBA0", "HBA1"])],
frequencydownsamplingfactor = 2,
numberofcollapsedchannels = 2,
channelwidth_frequency = 160,
channelwidth_frequencyunit = "MHz",
channelspersubband = 122
),
incoherentstokesprocessing = siplib.IncoherentStokesProcessing(
rawsamplingtime = 20,
rawsamplingtime_unit = "ns",
timesamplingdownfactor = 2,
samplingtime = 10,
samplingtime_unit = "ns",
stokes = ["XX"],
numberofstations = 1,
stations = [siplib.Station.preconfigured("CS003", ["HBA0", "HBA1"])],
frequencydownsamplingfactor = 2,
numberofcollapsedchannels = 2,
channelwidth_frequency = 160,
channelwidth_frequencyunit = "MHz",
channelspersubband = 122
),
flyseyeprocessing = siplib.FlysEyeProcessing(
rawsamplingtime = 10,
rawsamplingtime_unit = "ms",
timesamplingdownfactor = 2,
samplingtime = 2,
samplingtime_unit = "ms",
stokes = ["I"],
),
nonstandardprocessing = siplib.NonStandardProcessing(
channelwidth_frequency = 160,
channelwidth_frequencyunit = "MHz",
channelspersubband = 122
)
)],
transientbufferboardevents = ["event1", "event2"]
)))
mysip.save_to_file(TMPFILE_PATH) mysip.save_to_file(TMPFILE_PATH)
self.assertTrue(validator.validate(TMPFILE_PATH)) self.assertTrue(validator.validate(TMPFILE_PATH))
...@@ -637,6 +636,43 @@ class TestSIPlib(unittest.TestCase): ...@@ -637,6 +636,43 @@ class TestSIPlib(unittest.TestCase):
mysip.save_to_file(TMPFILE_PATH) mysip.save_to_file(TMPFILE_PATH)
self.assertTrue(validator.validate(TMPFILE_PATH)) self.assertTrue(validator.validate(TMPFILE_PATH))
def test_type_attributes(self):
'''Check if the types of complexTypes like CorrelatedDataproduct, Observation, etc are annotated using xml attributes.
The LTA catalogue code expects these attributes like xsi:type="sip:CorrelatedDataProduct" '''
mysip = create_basicdoc()
mysip.add_observation(create_observation())
xml_doc = mysip.get_prettyxml()
logger.info(xml_doc)
# The LTA catalogue code expects these attributes like xsi:type="sip:CorrelatedDataProduct"
# So, use etree XML DOM parsing for correct lookup of the interesting elements,
# but also check explicitly if the attribute as plain text is as the LTA code expects it.
# (There many ways to use a namespace, and parsers like etree have a tendency to interpret the plain text and substitute abbreviations with fqdn namespaces)
xml_root = etree.fromstring(xml_doc) # for DOM parsing
xml_lines = xml_doc.split('\n') # for plain text parsing
# check dataproduct type attribute via etree
dataproduct = xml_root.xpath('dataProduct')[0]
self.assertTrue('{http://www.w3.org/2001/XMLSchema-instance}type' in dataproduct.attrib)
self.assertEqual('sip:CorrelatedDataProduct', dataproduct.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'])
# check dataproduct type attribute via plain text
dataproduct_line = [l for l in xml_lines if l.lstrip().startswith('<dataProduct ')][0]
self.assertTrue('xsi:type="sip:CorrelatedDataProduct"' in dataproduct_line)
# check observation type attribute via etree
observation = xml_root.xpath('observation')[0]
self.assertTrue('{http://www.w3.org/2001/XMLSchema-instance}type' in observation.attrib)
self.assertEqual('sip:Observation', observation.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'])
# check observation type attribute via plain text
observation_line = [l for l in xml_lines if l.lstrip().startswith('<observation ')][0]
self.assertTrue('xsi:type="sip:Observation"' in observation_line)
# run tests if main # run tests if main
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment