Select Git revision
fix_long_baselines.cwl
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_adapter.py 41.48 KiB
#!/usr/bin/env python3
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id: $
import os
import unittest
import requests
import logging
logger = logging.getLogger('lofar.'+__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.common.test_utils import exit_with_skipped_code_if_skip_integration_tests
exit_with_skipped_code_if_skip_integration_tests()
# Do Mandatory setup step:
# use setup/teardown magic for tmss test database, ldap server and django server
# (ignore pycharm unused import statement, python unittests does use at RunTime the tmss_test_environment_unittest_setup module)
from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment
tmss_test_env = TMSSTestEnvironment(start_workflow_service=True, enable_viewflow=True)
try:
tmss_test_env.start()
tmss_test_env.populate_schemas()
except Exception as e:
logger.exception(str(e))
tmss_test_env.stop()
exit(1)
# tell unittest to stop (and automagically cleanup) the test database once all testing is done.
def tearDownModule():
tmss_test_env.stop()
AUTH = requests.auth.HTTPBasicAuth(tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)
BASE_URL = tmss_test_env.django_server.url[:-1] if tmss_test_env.django_server.url.endswith('/') else tmss_test_env.django_server.url
from lofar.sas.tmss.test.tmss_test_data_django_models import *
# import and setup rest test data creator
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
rest_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess
from lofar.sas.tmss.tmss.exceptions import SubtaskInvalidStateException
from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset, convert_to_parset_dict
from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema
from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduct
from lofar.lta.sip import constants
from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions
from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import ObservationResourceEstimator, PulsarPipelineResourceEstimator
class ObservationParsetAdapterTest(unittest.TestCase):
def get_default_specifications(self):
subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
return get_default_json_object_for_schema(subtask_template.schema)
def create_subtask(self, specifications_doc):
subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
dataproduct:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
return subtask
def test_correlator(self):
specifications_doc = self.get_default_specifications()
specifications_doc['COBALT']['version'] = 1
specifications_doc['COBALT']['correlator']['enabled'] = True
specifications_doc['stations']['digital_pointings'] = [
{ "name": "target1",
"subbands": list(range(8))
}
]
nr_files = 8 # = nr of subbands
subtask = self.create_subtask(specifications_doc)
parset = convert_to_parset_dict(subtask)
logger.info("test_correlator parset:",parset)
self.assertEqual(True, parset["Observation.DataProducts.Output_Correlated.enabled"])
self.assertEqual(False, parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
self.assertEqual(False, parset["Observation.DataProducts.Output_IncoherentStokes.enabled"])
self.assertEqual(False, parset["Cobalt.BeamFormer.flysEye"])
# check whether parset is accepted by the ResourceEstimator
estimator = ObservationResourceEstimator()
estimations = estimator.verify_and_estimate(convert_to_parset_dict(subtask))
self.assertEqual([], estimations["errors"])
# check whether the ResourceEstimator agrees with our spec
self.assertEqual(nr_files, estimations["estimates"][0]["output_files"]["uv"][0]["properties"]["nr_of_uv_files"] * estimations["estimates"][0]["resource_count"])
def test_piggyback_keys(self):
specifications_doc = self.get_default_specifications()
subtask = self.create_subtask(specifications_doc)
parset = convert_to_parset_dict(subtask)
sub = [tb.scheduling_unit_blueprint for tb in subtask.task_blueprints.all()][0]
# Assert the values are the same of the scheduling_unit_blueprint
self.assertEqual(sub.piggyback_allowed_aartfaac, parset["ObservationControl.StationControl.aartfaacPiggybackAllowed"])
self.assertEqual(sub.piggyback_allowed_tbb, parset["ObservationControl.StationControl.tbbPiggybackAllowed"])
def test_flyseye(self):
specifications_doc = self.get_default_specifications()
specifications_doc['COBALT']['version'] = 1
specifications_doc['COBALT']['correlator']['enabled'] = False
specifications_doc['stations']['station_list'] = ['CS001', 'CS002', 'RS205']
specifications_doc['stations']['antenna_set'] = 'HBA_DUAL'
specifications_doc['stations']['digital_pointings'] = [
{ "name": "target1",
"subbands": list(range(8))
}
]
specifications_doc['COBALT']['beamformer']['flyseye_pipelines'] = [
{ "coherent": {
"stokes": "IQUV",
"time_integration_factor": 4,
"channels_per_subband": 16
}
}
]
nr_files = 5 * 4 # 5 antenna fields (CS001HBA0, CS001HBA1, CS002HBA0, CS002HBA1, RS205HBA) * 4 stokes
subtask = self.create_subtask(specifications_doc)
parset = convert_to_parset_dict(subtask)
logger.info("test_flyseye parset:",parset)
self.assertEqual(True, parset["Cobalt.BeamFormer.flysEye"])
self.assertEqual(True, parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
self.assertEqual(nr_files, len(parset["Observation.DataProducts.Output_CoherentStokes.filenames"]))
# check whether parset is accepted by the ResourceEstimator
estimator = ObservationResourceEstimator()
estimations = estimator.verify_and_estimate(parset)
self.assertEqual([], estimations["errors"])
# check whether the ResourceEstimator agrees with our spec
self.assertEqual(nr_files, estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_files"] * estimations["estimates"][0]["resource_count"])
self.assertEqual(1, estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_parts"])
self.assertEqual(4, estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_stokes"])
def test_beamformer(self):
specifications_doc = self.get_default_specifications()
specifications_doc['COBALT']['version'] = 1
specifications_doc['COBALT']['correlator']['enabled'] = False
specifications_doc['stations']['digital_pointings'] = [
{ "name": "target1",
"subbands": list(range(8))
}
]
specifications_doc['COBALT']['beamformer']['tab_pipelines'] = [
{ "coherent": {
"stokes": "IQUV",
"time_integration_factor": 4,
"channels_per_subband": 16
},
"incoherent": {
"stokes": "IQUV",
"time_integration_factor": 4,
"channels_per_subband": 16
},
"SAPs": [
{ "name": "target1",
"tabs": [
{
"coherent": True,
"pointing": { "angle1": 1.0, "angle2": 2.0 }
},
{
"coherent": False
},
]
}
]
}
]
nr_cs_files = 1 * 4 # 1 TAB * 4 stokes
nr_is_files = 1 * 4 # 1 TAB * 4 stokes
subtask = self.create_subtask(specifications_doc)
parset = convert_to_parset_dict(subtask)
logger.info("test_beamformer parset:",parset)
self.assertEqual(True, parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
self.assertEqual(nr_cs_files, len(parset["Observation.DataProducts.Output_CoherentStokes.filenames"]))
self.assertEqual(True, parset["Observation.DataProducts.Output_IncoherentStokes.enabled"])
self.assertEqual(nr_is_files, len(parset["Observation.DataProducts.Output_IncoherentStokes.filenames"]))
# check whether parset is accepted by the ResourceEstimator
estimator = ObservationResourceEstimator()
estimations = estimator.verify_and_estimate(parset)
self.assertEqual([], estimations["errors"])
# check whether the ResourceEstimator agrees with our spec
self.assertEqual(nr_cs_files, estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_files"] * estimations["estimates"][0]["resource_count"])
self.assertEqual(1, estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_parts"])
self.assertEqual(4, estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_stokes"])
self.assertEqual(nr_is_files, estimations["estimates"][1]["output_files"]["is"][0]["properties"]["nr_of_is_files"] * estimations["estimates"][1]["resource_count"])
self.assertEqual(4, estimations["estimates"][1]["output_files"]["is"][0]["properties"]["nr_of_is_stokes"])
class PulsarPipelineParsetAdapterTest(unittest.TestCase):
def create_subtask(self, specifications_doc={}):
subtask_template = models.SubtaskTemplate.objects.get(name='pulsar pipeline')
specifications_doc = add_defaults_to_json_object_for_schema(specifications_doc, subtask_template.schema)
subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
dataproduct:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
return subtask
def test_pulp(self):
subtask = self.create_subtask()
parset = convert_to_parset_dict(subtask)
logger.info("test_pulp parset:",parset)
self.assertEqual(True, parset["Observation.DataProducts.Output_Pulsar.enabled"])
# TODO: ResourceEstimator needs a predecessor observation with dataproducts, so we forgo that for now.
class SIPadapterTest(unittest.TestCase):
def test_simple_sip_generate_from_dataproduct(self):
"""
Test if SIP is generated successfully when subtask, dataproduct and SAP objects are created
Check some value in the SIP (xml) output
Check that the SIP identifiers are in SIP (xml) output
Check the number of SIP identifiers are increased with 3
Check that all SIP identifiers are unique
"""
subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
specifications_doc = get_default_json_object_for_schema(subtask_template.schema)
specifications_doc['stations']['filter'] = "HBA_210_250"
feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback')
# feedback_doc = get_default_json_object_for_schema(feedback_template.schema) # todo <- fix the default generator, for some reason it does not produce valid json here...
feedback_doc = {'percentage_written': 100, 'frequency': {'subbands': [156], 'central_frequencies': [33593750.0], 'channel_width': 6103.515625, 'channels_per_subband': 32}, 'time': {'start_time': '2013-02-16T17:00:00', 'duration': 5.02732992172, 'sample_width': 2.00278016}, 'antennas': {'set': 'HBA_DUAL', 'fields': [{'type': 'HBA', 'field': 'HBA0', 'station': 'CS001'}, {'type': 'HBA', 'field': 'HBA1', 'station': 'CS001'}]}, 'target': {'pointing': {'angle1': 0, 'angle2': 0, 'direction_type': 'J2000'}}, 'samples': {'polarisations': ['XX', 'XY', 'YX', 'YY'], 'type': 'float', 'bits': 32, 'writer': 'standard', 'writer_version': '2.2.0', 'complex': True}, '$schema': 'http://127.0.0.1:8001/api/schemas/dataproductfeedbacktemplate/feedback/1#'}
for dp in specifications_doc['stations']['digital_pointings']:
dp['subbands'] = list(range(8))
# Create SubTask(output)
subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
# Create Dataproduct
dataproduct: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(feedback_doc=feedback_doc, producer=subtask_output))
# Create SAP
sap_template = models.SAPTemplate.objects.get(name="SAP")
specifications_doc = get_default_json_object_for_schema(sap_template.schema)
sap = models.SAP.objects.create(specifications_doc=specifications_doc, specifications_template=sap_template)
sap.save()
dataproduct.sap = sap
dataproduct.save()
sip = generate_sip_for_dataproduct(dataproduct)
# double-check that SIP contains values from feedback and specifications docs
self.assertIn(str(feedback_doc['frequency']['channel_width']), sip.get_prettyxml())
self.assertIn(str(feedback_doc['time']['start_time']), sip.get_prettyxml())
self.assertIn(constants.FILTERSELECTIONTYPE_210_250_MHZ, sip.get_prettyxml()) # specifications_doc: "HBA_210_250"
self.assertIn(str(subtask.global_identifier.unique_identifier), sip.get_prettyxml())
self.assertIn(str(dataproduct.global_identifier.unique_identifier), sip.get_prettyxml())
self.assertIn(str(sap.global_identifier.unique_identifier), sip.get_prettyxml())
# assert that a time MeasurementSet dataproduct in TMSS creates a CorrelatedDataproduct in the SIP
self.assertIn(str('<dataProduct xsi:type="sip:CorrelatedDataProduct">'), sip.get_prettyxml())
def test_simple_sip_generate_from_dataproduct_beamformed(self):
"""
Test if SIP is generated successfully when subtask, dataproduct and SAP objects are created
Check some value in the SIP (xml) output
Check that the SIP identifiers are in SIP (xml) output
Check the number of SIP identifiers are increased with 3
Check that all SIP identifiers are unique
"""
subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
specifications_doc = get_default_json_object_for_schema(subtask_template.schema)
specifications_doc['stations']['filter'] = "HBA_210_250"
feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback')
# feedback_doc = get_default_json_object_for_schema(feedback_template.schema) # todo <- fix the default generator, for some reason it does not produce valid json here...
feedback_doc = {'percentage_written': 100, 'frequency': {'subbands': [156], 'central_frequencies': [33593750.0], 'channel_width': 6103.515625, 'channels_per_subband': 32}, 'time': {'start_time': '2013-02-16T17:00:00', 'duration': 5.02732992172, 'sample_width': 2.00278016}, 'antennas': {'set': 'HBA_DUAL', 'fields': [{'type': 'HBA', 'field': 'HBA0', 'station': 'CS001'}, {'type': 'HBA', 'field': 'HBA1', 'station': 'CS001'}]}, 'target': {'pointing': {'angle1': 0, 'angle2': 0, 'direction_type': 'J2000'}}, 'samples': {'polarisations': ['XX', 'XY', 'YX', 'YY'], 'type': 'float', 'bits': 32, 'writer': 'standard', 'writer_version': '2.2.0', 'complex': True}, '$schema': 'http://127.0.0.1:8001/api/schemas/dataproductfeedbacktemplate/feedback/1#'}
for dp in specifications_doc['stations']['digital_pointings']:
dp['subbands'] = list(range(8))
# Create SubTask(output)
subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
# Create Dataproduct
dataproduct: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(feedback_doc=feedback_doc, producer=subtask_output,
dataformat=models.Dataformat.objects.get(value="Beamformed"),
datatype=models.Datatype.objects.get(value="time series")))
# Create SAP
sap_template = models.SAPTemplate.objects.get(name="SAP")
specifications_doc = get_default_json_object_for_schema(sap_template.schema)
sap = models.SAP.objects.create(specifications_doc=specifications_doc, specifications_template=sap_template)
sap.save()
dataproduct.sap = sap
dataproduct.save()
sip = generate_sip_for_dataproduct(dataproduct)
# double-check that SIP contains values from feedback and specifications docs
self.assertIn(str(feedback_doc['frequency']['channel_width']), sip.get_prettyxml())
self.assertIn(constants.FILTERSELECTIONTYPE_210_250_MHZ, sip.get_prettyxml()) # specifications_doc: "HBA_210_250"
for pol in feedback_doc['samples']['polarisations']:
self.assertIn(str(pol), sip.get_prettyxml())
self.assertIn(str(subtask.global_identifier.unique_identifier), sip.get_prettyxml())
self.assertIn(str(dataproduct.global_identifier.unique_identifier), sip.get_prettyxml())
self.assertIn(str(sap.global_identifier.unique_identifier), sip.get_prettyxml())
# assert that a Beamformed dataproduct in TMSS creates a BeamformedDataproduct in the SIP
self.assertIn(str('<dataProduct xsi:type="sip:BeamFormedDataProduct">'), sip.get_prettyxml())
# assert we get a coherent stokes beam by default
self.assertIn(str('CoherentStokesBeam'), sip.get_prettyxml())
# alter dataproduct, recreate sip
dataproduct.specifications_doc['coherent'] = False
dataproduct.save()
sip = generate_sip_for_dataproduct(dataproduct)
# assert we get an incoherent stokes beam
self.assertIn(str('<arrayBeam xsi:type="sip:IncoherentStokesBeam">'), sip.get_prettyxml())
# alter dataproduct, recreate sip
dataproduct.feedback_doc['antennas']['fields'] = [{'type': 'HBA', 'field': 'HBA0', 'station': 'CS001'}]
dataproduct.save()
sip = generate_sip_for_dataproduct(dataproduct)
# assert we get a flyseye beam if we have a single antenna field
self.assertIn(str('<arrayBeam xsi:type="sip:FlysEyeBeam">'), sip.get_prettyxml())
def test_simple_sip_generate_from_dataproduct_pulp(self):
"""
Test if SIP is generated successfully when subtask, dataproduct and SAP objects are created
Check some value in the SIP (xml) output
Check that the SIP identifiers are in SIP (xml) output
Check the number of SIP identifiers are increased with 3
Check that all SIP identifiers are unique
"""
subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
specifications_doc = get_default_json_object_for_schema(subtask_template.schema)
specifications_doc['stations']['filter'] = "HBA_110_190"
feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback')
# feedback_doc = get_default_json_object_for_schema(feedback_template.schema) # todo <- fix the default generator, for some reason it does not produce valid json here...
feedback_doc = {'percentage_written': 100, 'frequency': {'subbands': [152], 'central_frequencies': [33593750.0], 'channel_width': 3051.7578125, 'channels_per_subband': 64}, 'time': {'start_time': '2013-02-16T17:00:00', 'duration': 5.02732992172, 'sample_width': 2.00278016}, 'antennas': {'set': 'HBA_DUAL', 'fields': [{'type': 'HBA', 'field': 'HBA0', 'station': 'CS001'}, {'type': 'HBA', 'field': 'HBA1', 'station': 'CS001'}]}, 'target': {'pointing': {'angle1': 0, 'angle2': 0, 'direction_type': 'J2000'}, 'coherent': True}, 'samples': {'polarisations': ['XX', 'XY', 'YX', 'YY'], 'type': 'float', 'bits': 32, 'writer': 'standard', 'writer_version': '2.2.0', 'complex': True}, 'files': ['stokes/SAP0/CS003HBA1/L773569_SAP000_B005_S0_P000_bf.h5', 'stokes/SAP0/RS106HBA/L773569_SAP000_B046_S0_P000_bf.h5'], '$schema': 'http://127.0.0.1:8001/api/schemas/dataproductfeedbacktemplate/feedback/1#'}
for dp in specifications_doc['stations']['digital_pointings']:
dp['subbands'] = list(range(8))
# Create SubTask(output)
subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
subtask: models.Subtask = models.Subtask.objects.create(**subtask_data)
subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
# Create Dataproduct
dataproduct: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(feedback_doc=feedback_doc, producer=subtask_output,
dataformat=models.Dataformat.objects.get(value="pulp analysis"),
datatype=models.Datatype.objects.get(value="pulsar profile")))
# Create SAP
sap_template = models.SAPTemplate.objects.get(name="SAP")
specifications_doc = get_default_json_object_for_schema(sap_template.schema)
sap = models.SAP.objects.create(specifications_doc=specifications_doc, specifications_template=sap_template)
sap.save()
dataproduct.sap = sap
dataproduct.save()
# PULP ANALYSIS
sip = generate_sip_for_dataproduct(dataproduct)
# double-check that SIP contains values from feedback and specifications docs
self.assertIn(str(feedback_doc['frequency']['channel_width']), sip.get_prettyxml())
self.assertIn(constants.FILTERSELECTIONTYPE_110_190_MHZ, sip.get_prettyxml())
for pol in feedback_doc['samples']['polarisations']:
self.assertIn(str(pol), sip.get_prettyxml())
self.assertIn(str(subtask.global_identifier.unique_identifier), sip.get_prettyxml())
self.assertIn(str(dataproduct.global_identifier.unique_identifier), sip.get_prettyxml())
self.assertIn(str(sap.global_identifier.unique_identifier), sip.get_prettyxml())
# assert that a pulp analysis dataproduct in TMSS creates a PulpDataProduct in the SIP
self.assertIn(str('<dataProduct xsi:type="sip:PulpDataProduct">'), sip.get_prettyxml())
# assert beam type
self.assertIn(str('FlysEyeBeam'), sip.get_prettyxml())
# assert datatype
self.assertIn(str('<dataType>CoherentStokes</dataType>'), sip.get_prettyxml())
# assert fileformat
self.assertIn(str('<fileFormat>PULP</fileFormat>'), sip.get_prettyxml())
# alter dataproduct, recreate sip
dataproduct.feedback_doc['target']['coherent'] = False
dataproduct.save()
sip = generate_sip_for_dataproduct(dataproduct)
# assert datatype reflects change of coherent flag
self.assertIn(str('<dataType>IncoherentStokes</dataType>'), sip.get_prettyxml())
# PULP SUMMARY
# alter dataproduct, recreate sip
dataproduct.dataformat = models.Dataformat.objects.get(value="pulp summary")
dataproduct.feedback_doc['$schema'] = 'http://127.0.0.1:8001/api/schemas/dataproductfeedbacktemplate/pulp summary/1#'
dataproduct.save()
sip = generate_sip_for_dataproduct(dataproduct)
# assert datatype reflects change of dataformat
self.assertIn(str('<dataType>SummaryIncoherentStokes</dataType>'), sip.get_prettyxml())
# assert that a pulp summary dataproduct in TMSS creates a PulpSummaryDataProduct in the SIP
self.assertIn(str('<dataProduct xsi:type="sip:PulpSummaryDataProduct">'), sip.get_prettyxml())
# assert fileformat
self.assertIn(str('<fileFormat>PULP</fileFormat>'), sip.get_prettyxml())
class CycleReportTest(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
# Create requirements
cls.cycle = models.Cycle.objects.create(**Cycle_test_data(start=datetime.utcnow().isoformat(), stop=(datetime.utcnow() + timedelta(weeks=12)).isoformat()))
# Projects
cls.project = models.Project.objects.create(**Project_test_data(name='unassigned'))
cls.project.cycles.set([cls.cycle.pk])
cls.project_regular = models.Project.objects.create(**Project_test_data(name='regular'))
cls.project_regular.cycles.set([cls.cycle.pk])
cls.project_regular.project_category = models.ProjectCategory.objects.get(value='regular')
cls.project_regular.save()
cls.project_user_shared_support = models.Project.objects.create(**Project_test_data(name='user_shared_support'))
cls.project_user_shared_support.cycles.set([cls.cycle.pk])
cls.project_user_shared_support.project_category = models.ProjectCategory.objects.get(value='user_shared_support')
cls.project_user_shared_support.save()
cls.project_com = models.Project.objects.create(**Project_test_data(name='commissioning'))
cls.project_com.cycles.set([cls.cycle.pk])
cls.project_com.project_category = models.ProjectCategory.objects.get(value='commissioning')
cls.project_com.save()
cls.project_ddt = models.Project.objects.create(**Project_test_data(name='ddt'))
cls.project_ddt.cycles.set([cls.cycle.pk])
cls.project_ddt.project_category = models.ProjectCategory.objects.get(value='ddt')
cls.project_ddt.save()
cls.project_test = models.Project.objects.create(**Project_test_data(name='test'))
cls.project_test.cycles.set([cls.cycle.pk])
cls.project_test.project_category = models.ProjectCategory.objects.get(value='test')
cls.project_test.save()
# SU, SUD and TD
cls.projects_components = {}
for p in (cls.project, cls.project_regular, cls.project_user_shared_support, cls.project_com, cls.project_ddt, cls.project_test):
scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=cls.project))
scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data(scheduling_set=scheduling_set))
task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(scheduling_unit_draft=scheduling_unit_draft))
cls.projects_components[f'{p.name}'] = {'scheduling_set': scheduling_set, 'scheduling_unit_draft': scheduling_unit_draft, 'task_draft': task_draft}
# Create test_data_creator as superuser
cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)
response = requests.get(cls.test_data_creator.django_api_url + '/', auth=cls.test_data_creator.auth)
def _create_subtask_with_type_and_set_status(self, type, status=None, project_name=None):
"""
Help method to create a Subtask by specifying its type and (optionally) set the its status
and (optionally) a project to belong to.
"""
if not project_name:
project_name = 'unassigned'
sub = models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data(draft=self.projects_components[project_name]['scheduling_unit_draft']))
tb = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=self.projects_components[project_name]['task_draft'], scheduling_unit_blueprint=sub))
# Create Subtask
subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data(subtask_type_value=type))
subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=subtask_template))
subtask.task_blueprints.set([tb])
if status:
set_subtask_state_following_allowed_transitions(subtask, status)
return subtask
def test_create_cycle_report(self):
"""
Test create_cycle_report extra action.
"""
# Create and set two Subtasks of type 'observation' and 'pipeline' with the state 'finished'.
subtask_obs = self._create_subtask_with_type_and_set_status('observation', 'finished')
subtask_pip = self._create_subtask_with_type_and_set_status('pipeline', 'finished')
# Create SubtaskOutput and Dataproducts
subtask_output_obs = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_obs))
dp_interferometric_obs = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_obs, dataformat=models.Dataformat.objects.get(value="MeasurementSet"), datatype=models.Datatype.objects.get(value="visibilities")))
dp_beamformed_obs = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_obs, dataformat=models.Dataformat.objects.get(value="Beamformed"), datatype=models.Datatype.objects.get(value="time series")))
subtask_output_pip = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pip))
dp_preprocessing_pip = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_pip, dataformat=models.Dataformat.objects.get(value="MeasurementSet"), datatype=models.Datatype.objects.get(value="visibilities")))
dp_pulsar_pip1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_pip, dataformat=models.Dataformat.objects.get(value="pulp summary"), datatype=models.Datatype.objects.get(value="pulsar profile")))
dp_pulsar_pip2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_pip, dataformat=models.Dataformat.objects.get(value="pulp analysis"), datatype=models.Datatype.objects.get(value="time series")))
# Create generic and 'stand-alone' reservations
reservation_no_project = models.Reservation.objects.create(**Reservation_test_data(duration=300))
reservation_mixed = models.Reservation.objects.create(**Reservation_test_data(duration=500, project=self.project)) # Non-production project
reservation_project = models.Reservation.objects.create(**Reservation_test_data(duration=600, project=self.project_regular)) # Production project
reservation_template = models.ReservationTemplate.objects.get(name="reservation")
reservation_template_spec = get_default_json_object_for_schema(reservation_template.schema)
reservation_template_spec['activity']['type'] = 'stand-alone mode'
reservation_no_project_sa_mode = models.Reservation.objects.create(start_time=datetime.now(), stop_time=datetime.now()+timedelta(seconds=1200), name="SA no project", description="SA no project", specifications_template=reservation_template, specifications_doc=reservation_template_spec)
reservation_mixed_sa_mode = models.Reservation.objects.create(start_time=datetime.now(), stop_time=datetime.now()+timedelta(seconds=350), project=self.project, name="SA mixed no project", description="SA mixed no project", specifications_template=reservation_template, specifications_doc=reservation_template_spec)
reservation_project_sa_mode = models.Reservation.objects.create(start_time=datetime.now(), stop_time=datetime.now() + timedelta(seconds=800), project=self.project_regular, name="SA project", description="SA project", specifications_template=reservation_template, specifications_doc=reservation_template_spec)
# Assertions
# Assert we get the expected object
response = requests.get(BASE_URL + '/cycle/%s/report' % self.cycle.pk, auth=self.test_data_creator.auth)
self.assertEqual(response.status_code, 200)
result = response.json()
# Assert data_ingested_per_site_and_category
data_per_site_and_cat = result['data_ingested_per_site_and_category']
self.assertEqual(data_per_site_and_cat['Interferometric Observation']['size__sum'], 123)
self.assertEqual(data_per_site_and_cat['Beamformed Observation']['size__sum'], 123)
self.assertEqual(data_per_site_and_cat['Preprocessing Pipeline']['size__sum'], 123)
self.assertEqual(data_per_site_and_cat['Pulsar Pipeline']['size__sum'], 246)
# Assert usage_mode
usage_mode = result['usage_mode']
self.assertAlmostEqual(usage_mode['all modes']['total'], 3750, places=4)
self.assertAlmostEqual(usage_mode['all modes']['observing'], 1400, places=4)
self.assertAlmostEqual(usage_mode['all modes']['idle/test'], 850, places=4)
self.assertAlmostEqual(usage_mode['stand-alone mode']['total'], 2350, places=4)
self.assertAlmostEqual(usage_mode['stand-alone mode']['no project'], 1200, places=4)
self.assertAlmostEqual(usage_mode['stand-alone mode']['project'], 800, places=4)
self.assertAlmostEqual(usage_mode['stand-alone mode']['mixed/no project'], 350, places=4)
self.assertAlmostEqual(usage_mode['ILT mode']['total'], 1400, places=4)
self.assertAlmostEqual(usage_mode['ILT mode']['observing'], 600, places=4)
self.assertAlmostEqual(usage_mode['ILT mode']['idle/test'], 500, places=4)
class ProjectReportTest(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
# Create requirements
cls.project = models.Project.objects.create(**Project_test_data(name='test_for_report'))
cls.project_quota = models.ProjectQuota.objects.create(
**ProjectQuota_test_data(project=cls.project, resource_type=models.ResourceType.objects.create(
**ResourceType_test_data(quantity=models.Quantity.objects.get(value=models.Quantity.Choices.NUMBER.value)))))
cls.scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=cls.project))
cls.scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data(scheduling_set=cls.scheduling_set))
cls.task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(scheduling_unit_draft=cls.scheduling_unit_draft))
# Create test_data_creator as superuser
cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)
response = requests.get(cls.test_data_creator.django_api_url + '/', auth=cls.test_data_creator.auth)
def _get_SUB_with_subtask_and_set_status(self, status=None):
"""
Help method to create SUB, TaskBlueprint, Subtask and (optionally) set the latter's status.
"""
sub = models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data(draft=self.scheduling_unit_draft))
tb = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=self.task_draft, scheduling_unit_blueprint=sub))
# Create Subtask of type 'ingest'
subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data(subtask_type_value='ingest'))
subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=subtask_template))
subtask.task_blueprints.set([tb])
if status:
set_subtask_state_following_allowed_transitions(subtask, status)
return sub, tb, subtask
def test_create_project_report(self):
"""
Test create project extra action.
"""
# Create four SUBs and respectively set their states to: 'finished' x2 (so we can create dataproducts, compare
# their sizes, and have a successful and a failed SUBs), 'cancelled' and 'defined' (which means not cancelled).
succeeded_sub, _, succeeded_subtask = self._get_SUB_with_subtask_and_set_status('finished')
failed_sub, _, failed_subtask = self._get_SUB_with_subtask_and_set_status('finished')
cancelled_sub, _, cancelled_subtask = self._get_SUB_with_subtask_and_set_status('cancelled')
not_cancelled_sub, _, not_cancelled_subtask = self._get_SUB_with_subtask_and_set_status('defined')
# Set workflow flags so we have a successful and a failed SUBs
SchedulingUnitProcess.objects.create(su=succeeded_sub, results_accepted=True)
SchedulingUnitProcess.objects.create(su=failed_sub, results_accepted=False)
# Create SubtaskOutput and Dataproducts from subtask_output
subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=succeeded_subtask))
dataproduct1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
dataproduct2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
# Assert we get the expected object
response = requests.get(BASE_URL + '/project/%s/report' % self.project.pk, auth=self.test_data_creator.auth)
self.assertEqual(response.status_code, 200)
result = response.json()
# Assert Project and ProjectQuota ids
self.assertEqual(result['project'], self.project.pk)
self.assertEqual(result['quota'][0]['id'], self.project_quota.pk)
# Assert durations are well calculated
# The four SUBs (successful, failed, cancelled and not cancelled) have a duration/observed_duration of 600s each
self.assertAlmostEqual(result['durations']['total'], 2400, places=4)
self.assertAlmostEqual(result['durations']['total_not_cancelled'], 1800, places=4)
self.assertAlmostEqual(result['durations']['total_succeeded'], 600, places=4)
self.assertAlmostEqual(result['durations']['total_succeeded_A'], 600, places=4) # The succeeded SUB has prio A
self.assertAlmostEqual(result['durations']['total_succeeded_B'], 0, places=4)
self.assertAlmostEqual(result['durations']['total_failed'], 600, places=4)
self.assertAlmostEqual(result['durations']['total_observed'], 1200, places=4)
self.assertAlmostEqual(result['durations']['total_observed_succeeded'], 600, places=4)
self.assertAlmostEqual(result['durations']['total_observed_succeeded_A'], 600, places=4)
self.assertAlmostEqual(result['durations']['total_observed_succeeded_B'], 0, places=4)
self.assertAlmostEqual(result['durations']['total_observed_failed'], 600, places=4)
# Assert percentages
self.assertAlmostEqual(result['durations']['not_cancelled_perc'], 0.75, places=2)
self.assertAlmostEqual(result['durations']['succeeded_perc'], 0.25, places=2)
self.assertAlmostEqual(result['durations']['failed_perc'], 0.25, places=2)
self.assertAlmostEqual(result['durations']['observed_perc'], 0.50, places=2)
self.assertAlmostEqual(result['durations']['observed_succeeded_perc'], 0.25, places=2)
self.assertAlmostEqual(result['durations']['observed_failed_perc'], 0.25, places=2)
# There is only one successful SUB
self.assertEqual(result['SUBs']['successful'][0]['id'], succeeded_sub.pk)
self.assertEqual(result['SUBs']['successful'][0]['status'], 'finished')
self.assertEqual(result['SUBs']['successful'][0]['ingested_data_size'], 246)
# There is only one failed SUB
self.assertEqual(result['SUBs']['failed'][0]['id'], failed_sub.pk)
self.assertEqual(result['SUBs']['failed'][0]['status'], 'finished')
self.assertIsNone(result['SUBs']['failed'][0]['ingested_data_size'])
# There are just two dataproducts, each has a size of 123
self.assertEqual(result['LTA dataproducts']['size__sum'], 246)
# Just to check if the placeholder was added
self.assertIsNotNone(result['SAPs exposure']) # TODO: Implement test properly.
if __name__ == "__main__":
os.environ['TZ'] = 'UTC'
unittest.main()