Skip to content
Snippets Groups Projects
Select Git revision
  • 7429832db68323639ab6060074bf178d8b9fcd1d
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

t_adapter.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    t_adapter.py 41.48 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    # $Id:  $
    
    import os
    import unittest
    import requests
    
    import logging
    logger = logging.getLogger('lofar.'+__name__)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
    from lofar.common.test_utils import exit_with_skipped_code_if_skip_integration_tests
    exit_with_skipped_code_if_skip_integration_tests()
    
    # Do Mandatory setup step:
    # use setup/teardown magic for tmss test database, ldap server and django server
    # (ignore pycharm unused import statement, python unittests does use at RunTime the tmss_test_environment_unittest_setup module)
    
    from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment
    tmss_test_env = TMSSTestEnvironment(start_workflow_service=True, enable_viewflow=True)
    try:
        tmss_test_env.start()
        tmss_test_env.populate_schemas()
    except Exception as e:
        logger.exception(str(e))
        tmss_test_env.stop()
        exit(1)
    
    # tell unittest to stop (and automagically cleanup) the test database once all testing is done.
    def tearDownModule():
        tmss_test_env.stop()
    
    
    AUTH = requests.auth.HTTPBasicAuth(tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)
    BASE_URL = tmss_test_env.django_server.url[:-1] if tmss_test_env.django_server.url.endswith('/')  else tmss_test_env.django_server.url
    
    from lofar.sas.tmss.test.tmss_test_data_django_models import *
    
    # import and setup rest test data creator
    from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
    rest_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)
    
    from lofar.sas.tmss.tmss.tmssapp import models
    from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess
    from lofar.sas.tmss.tmss.exceptions import SubtaskInvalidStateException
    from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset, convert_to_parset_dict
    from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema
    from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduct
    from lofar.lta.sip import constants
    from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions
    
    from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import ObservationResourceEstimator, PulsarPipelineResourceEstimator
    
    
    class ObservationParsetAdapterTest(unittest.TestCase):
        def get_default_specifications(self):
            subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
            return get_default_json_object_for_schema(subtask_template.schema)
    
        def create_subtask(self, specifications_doc):
            subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
            subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
            subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
            subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
            subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
            dataproduct:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
            return subtask
    
        def test_correlator(self):
            specifications_doc = self.get_default_specifications()
            specifications_doc['COBALT']['version'] = 1
            specifications_doc['COBALT']['correlator']['enabled'] = True
            specifications_doc['stations']['digital_pointings'] = [
              { "name": "target1",
                "subbands": list(range(8))
              }
            ]
    
            nr_files = 8 # = nr of subbands
    
            subtask = self.create_subtask(specifications_doc)
            parset = convert_to_parset_dict(subtask)
            logger.info("test_correlator parset:",parset)
    
            self.assertEqual(True, parset["Observation.DataProducts.Output_Correlated.enabled"])
            self.assertEqual(False, parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
            self.assertEqual(False, parset["Observation.DataProducts.Output_IncoherentStokes.enabled"])
            self.assertEqual(False, parset["Cobalt.BeamFormer.flysEye"])
    
            # check whether parset is accepted by the ResourceEstimator
            estimator = ObservationResourceEstimator()
            estimations = estimator.verify_and_estimate(convert_to_parset_dict(subtask))
            self.assertEqual([],       estimations["errors"])
    
            # check whether the ResourceEstimator agrees with our spec
            self.assertEqual(nr_files, estimations["estimates"][0]["output_files"]["uv"][0]["properties"]["nr_of_uv_files"] * estimations["estimates"][0]["resource_count"])
    
        def test_piggyback_keys(self):
            specifications_doc = self.get_default_specifications()
            subtask = self.create_subtask(specifications_doc)
            parset = convert_to_parset_dict(subtask)
            sub = [tb.scheduling_unit_blueprint for tb in subtask.task_blueprints.all()][0]
    
            # Assert the values are the same of the scheduling_unit_blueprint
            self.assertEqual(sub.piggyback_allowed_aartfaac, parset["ObservationControl.StationControl.aartfaacPiggybackAllowed"])
            self.assertEqual(sub.piggyback_allowed_tbb, parset["ObservationControl.StationControl.tbbPiggybackAllowed"])
    
        def test_flyseye(self):
            specifications_doc = self.get_default_specifications()
            specifications_doc['COBALT']['version'] = 1
            specifications_doc['COBALT']['correlator']['enabled'] = False
            specifications_doc['stations']['station_list'] = ['CS001', 'CS002', 'RS205']
            specifications_doc['stations']['antenna_set'] = 'HBA_DUAL'
            specifications_doc['stations']['digital_pointings'] = [
              { "name": "target1",
                "subbands": list(range(8))
              }
            ]
    
            specifications_doc['COBALT']['beamformer']['flyseye_pipelines'] = [
                { "coherent": {
                    "stokes": "IQUV",
                    "time_integration_factor": 4,
                    "channels_per_subband": 16
                  }
                }
            ]
    
            nr_files = 5 * 4 # 5 antenna fields (CS001HBA0, CS001HBA1, CS002HBA0, CS002HBA1, RS205HBA) * 4 stokes
    
            subtask = self.create_subtask(specifications_doc)
            parset = convert_to_parset_dict(subtask)
            logger.info("test_flyseye parset:",parset)
    
            self.assertEqual(True,     parset["Cobalt.BeamFormer.flysEye"])
            self.assertEqual(True,     parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
            self.assertEqual(nr_files, len(parset["Observation.DataProducts.Output_CoherentStokes.filenames"]))
    
            # check whether parset is accepted by the ResourceEstimator
            estimator = ObservationResourceEstimator()
            estimations = estimator.verify_and_estimate(parset)
            self.assertEqual([],       estimations["errors"])
    
            # check whether the ResourceEstimator agrees with our spec
            self.assertEqual(nr_files, estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_files"] * estimations["estimates"][0]["resource_count"])
            self.assertEqual(1,        estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_parts"])
            self.assertEqual(4,        estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_stokes"])
    
        def test_beamformer(self):
            specifications_doc = self.get_default_specifications()
            specifications_doc['COBALT']['version'] = 1
            specifications_doc['COBALT']['correlator']['enabled'] = False
            specifications_doc['stations']['digital_pointings'] = [
              { "name": "target1",
                "subbands": list(range(8))
              }
            ]
    
            specifications_doc['COBALT']['beamformer']['tab_pipelines'] = [
                { "coherent": {
                    "stokes": "IQUV",
                    "time_integration_factor": 4,
                    "channels_per_subband": 16
                  },
                  "incoherent": {
                    "stokes": "IQUV",
                    "time_integration_factor": 4,
                    "channels_per_subband": 16
                  },
    
                  "SAPs": [
                    { "name": "target1",
                      "tabs": [
                        {
                          "coherent": True,
                          "pointing": { "angle1": 1.0, "angle2": 2.0 }
                        },
                        {
                          "coherent": False
                        },
                      ]
                    }
                  ]
                }
            ]
    
            nr_cs_files = 1 * 4 # 1 TAB * 4 stokes
            nr_is_files = 1 * 4 # 1 TAB * 4 stokes
    
            subtask = self.create_subtask(specifications_doc)
            parset = convert_to_parset_dict(subtask)
            logger.info("test_beamformer parset:",parset)
    
            self.assertEqual(True,        parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
            self.assertEqual(nr_cs_files, len(parset["Observation.DataProducts.Output_CoherentStokes.filenames"]))
            self.assertEqual(True,        parset["Observation.DataProducts.Output_IncoherentStokes.enabled"])
            self.assertEqual(nr_is_files, len(parset["Observation.DataProducts.Output_IncoherentStokes.filenames"]))
    
            # check whether parset is accepted by the ResourceEstimator
            estimator = ObservationResourceEstimator()
            estimations = estimator.verify_and_estimate(parset)
            self.assertEqual([],       estimations["errors"])
    
            # check whether the ResourceEstimator agrees with our spec
            self.assertEqual(nr_cs_files, estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_files"] * estimations["estimates"][0]["resource_count"])
            self.assertEqual(1,           estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_parts"])
            self.assertEqual(4,           estimations["estimates"][0]["output_files"]["cs"][0]["properties"]["nr_of_cs_stokes"])
    
            self.assertEqual(nr_is_files, estimations["estimates"][1]["output_files"]["is"][0]["properties"]["nr_of_is_files"] * estimations["estimates"][1]["resource_count"])
            self.assertEqual(4,           estimations["estimates"][1]["output_files"]["is"][0]["properties"]["nr_of_is_stokes"])
    
    
    class PulsarPipelineParsetAdapterTest(unittest.TestCase):
        def create_subtask(self, specifications_doc={}):
            subtask_template = models.SubtaskTemplate.objects.get(name='pulsar pipeline')
            specifications_doc = add_defaults_to_json_object_for_schema(specifications_doc, subtask_template.schema)
    
            subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
            subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
    
            subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
            subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
            dataproduct:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
            return subtask
    
        def test_pulp(self):
            subtask = self.create_subtask()
            parset = convert_to_parset_dict(subtask)
            logger.info("test_pulp parset:",parset)
    
            self.assertEqual(True, parset["Observation.DataProducts.Output_Pulsar.enabled"])
    
            # TODO: ResourceEstimator needs a predecessor observation with dataproducts, so we forgo that for now.
    
    
    class SIPadapterTest(unittest.TestCase):
        def test_simple_sip_generate_from_dataproduct(self):
            """
            Test if SIP is generated successfully when subtask, dataproduct and SAP objects are created
            Check some value in the SIP (xml) output
            Check that the SIP identifiers are in SIP (xml) output
            Check the number of SIP identifiers are increased with 3
            Check that all SIP identifiers are unique
            """
            subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
            specifications_doc = get_default_json_object_for_schema(subtask_template.schema)
            specifications_doc['stations']['filter'] = "HBA_210_250"
            feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback')
            # feedback_doc = get_default_json_object_for_schema(feedback_template.schema)  # todo <- fix the default generator, for some reason it does not produce valid json here...
            feedback_doc = {'percentage_written': 100, 'frequency': {'subbands': [156], 'central_frequencies': [33593750.0], 'channel_width': 6103.515625, 'channels_per_subband': 32}, 'time': {'start_time': '2013-02-16T17:00:00', 'duration': 5.02732992172, 'sample_width': 2.00278016}, 'antennas': {'set': 'HBA_DUAL', 'fields': [{'type': 'HBA', 'field': 'HBA0', 'station': 'CS001'}, {'type': 'HBA', 'field': 'HBA1', 'station': 'CS001'}]}, 'target': {'pointing': {'angle1': 0, 'angle2': 0, 'direction_type': 'J2000'}}, 'samples': {'polarisations': ['XX', 'XY', 'YX', 'YY'], 'type': 'float', 'bits': 32, 'writer': 'standard', 'writer_version': '2.2.0', 'complex': True}, '$schema': 'http://127.0.0.1:8001/api/schemas/dataproductfeedbacktemplate/feedback/1#'}
            for dp in specifications_doc['stations']['digital_pointings']:
                dp['subbands'] = list(range(8))
            # Create SubTask(output)
            subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
            subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
            subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
            subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
            # Create Dataproduct
            dataproduct: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(feedback_doc=feedback_doc, producer=subtask_output))
    
            # Create SAP
            sap_template = models.SAPTemplate.objects.get(name="SAP")
            specifications_doc = get_default_json_object_for_schema(sap_template.schema)
            sap = models.SAP.objects.create(specifications_doc=specifications_doc, specifications_template=sap_template)
            sap.save()
    
            dataproduct.sap = sap
            dataproduct.save()
    
            sip = generate_sip_for_dataproduct(dataproduct)
    
            # double-check that SIP contains values from feedback and specifications docs
            self.assertIn(str(feedback_doc['frequency']['channel_width']), sip.get_prettyxml())
            self.assertIn(str(feedback_doc['time']['start_time']), sip.get_prettyxml())
            self.assertIn(constants.FILTERSELECTIONTYPE_210_250_MHZ, sip.get_prettyxml()) # specifications_doc: "HBA_210_250"
    
            self.assertIn(str(subtask.global_identifier.unique_identifier), sip.get_prettyxml())
            self.assertIn(str(dataproduct.global_identifier.unique_identifier), sip.get_prettyxml())
            self.assertIn(str(sap.global_identifier.unique_identifier), sip.get_prettyxml())
    
            # assert that a time MeasurementSet dataproduct in TMSS creates a CorrelatedDataproduct in the SIP
            self.assertIn(str('<dataProduct xsi:type="sip:CorrelatedDataProduct">'), sip.get_prettyxml())
    
        def test_simple_sip_generate_from_dataproduct_beamformed(self):
            """
            Test if SIP is generated successfully when subtask, dataproduct and SAP objects are created
            Check some value in the SIP (xml) output
            Check that the SIP identifiers are in SIP (xml) output
            Check the number of SIP identifiers are increased with 3
            Check that all SIP identifiers are unique
            """
            subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
            specifications_doc = get_default_json_object_for_schema(subtask_template.schema)
            specifications_doc['stations']['filter'] = "HBA_210_250"
            feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback')
            # feedback_doc = get_default_json_object_for_schema(feedback_template.schema)  # todo <- fix the default generator, for some reason it does not produce valid json here...
            feedback_doc = {'percentage_written': 100, 'frequency': {'subbands': [156], 'central_frequencies': [33593750.0], 'channel_width': 6103.515625, 'channels_per_subband': 32}, 'time': {'start_time': '2013-02-16T17:00:00', 'duration': 5.02732992172, 'sample_width': 2.00278016}, 'antennas': {'set': 'HBA_DUAL', 'fields': [{'type': 'HBA', 'field': 'HBA0', 'station': 'CS001'}, {'type': 'HBA', 'field': 'HBA1', 'station': 'CS001'}]}, 'target': {'pointing': {'angle1': 0, 'angle2': 0, 'direction_type': 'J2000'}}, 'samples': {'polarisations': ['XX', 'XY', 'YX', 'YY'], 'type': 'float', 'bits': 32, 'writer': 'standard', 'writer_version': '2.2.0', 'complex': True}, '$schema': 'http://127.0.0.1:8001/api/schemas/dataproductfeedbacktemplate/feedback/1#'}
            for dp in specifications_doc['stations']['digital_pointings']:
                dp['subbands'] = list(range(8))
            # Create SubTask(output)
            subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
            subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
            subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
            subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
            # Create Dataproduct
            dataproduct: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(feedback_doc=feedback_doc, producer=subtask_output,
                                                                                                        dataformat=models.Dataformat.objects.get(value="Beamformed"),
                                                                                                        datatype=models.Datatype.objects.get(value="time series")))
    
            # Create SAP
            sap_template = models.SAPTemplate.objects.get(name="SAP")
            specifications_doc = get_default_json_object_for_schema(sap_template.schema)
            sap = models.SAP.objects.create(specifications_doc=specifications_doc, specifications_template=sap_template)
            sap.save()
    
            dataproduct.sap = sap
            dataproduct.save()
    
            sip = generate_sip_for_dataproduct(dataproduct)
    
            # double-check that SIP contains values from feedback and specifications docs
            self.assertIn(str(feedback_doc['frequency']['channel_width']), sip.get_prettyxml())
            self.assertIn(constants.FILTERSELECTIONTYPE_210_250_MHZ, sip.get_prettyxml()) # specifications_doc: "HBA_210_250"
            for pol in feedback_doc['samples']['polarisations']:
                self.assertIn(str(pol), sip.get_prettyxml())
    
            self.assertIn(str(subtask.global_identifier.unique_identifier), sip.get_prettyxml())
            self.assertIn(str(dataproduct.global_identifier.unique_identifier), sip.get_prettyxml())
            self.assertIn(str(sap.global_identifier.unique_identifier), sip.get_prettyxml())
    
            # assert that a Beamformed dataproduct in TMSS creates a BeamformedDataproduct in the SIP
            self.assertIn(str('<dataProduct xsi:type="sip:BeamFormedDataProduct">'), sip.get_prettyxml())
    
            # assert we get a coherent stokes beam by default
            self.assertIn(str('CoherentStokesBeam'), sip.get_prettyxml())
    
            # alter dataproduct, recreate sip
            dataproduct.specifications_doc['coherent'] = False
            dataproduct.save()
            sip = generate_sip_for_dataproduct(dataproduct)
    
            # assert we get an incoherent stokes beam
            self.assertIn(str('<arrayBeam xsi:type="sip:IncoherentStokesBeam">'), sip.get_prettyxml())
    
            # alter dataproduct, recreate sip
            dataproduct.feedback_doc['antennas']['fields'] = [{'type': 'HBA', 'field': 'HBA0', 'station': 'CS001'}]
            dataproduct.save()
            sip = generate_sip_for_dataproduct(dataproduct)
    
            # assert we get a flyseye beam if we have a single antenna field
            self.assertIn(str('<arrayBeam xsi:type="sip:FlysEyeBeam">'), sip.get_prettyxml())
    
        def test_simple_sip_generate_from_dataproduct_pulp(self):
            """
            Test if SIP is generated successfully when subtask, dataproduct and SAP objects are created
            Check some value in the SIP (xml) output
            Check that the SIP identifiers are in SIP (xml) output
            Check the number of SIP identifiers are increased with 3
            Check that all SIP identifiers are unique
            """
            subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
            specifications_doc = get_default_json_object_for_schema(subtask_template.schema)
            specifications_doc['stations']['filter'] = "HBA_110_190"
            feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback')
            # feedback_doc = get_default_json_object_for_schema(feedback_template.schema)  # todo <- fix the default generator, for some reason it does not produce valid json here...
            feedback_doc = {'percentage_written': 100, 'frequency': {'subbands': [152], 'central_frequencies': [33593750.0], 'channel_width': 3051.7578125, 'channels_per_subband': 64}, 'time': {'start_time': '2013-02-16T17:00:00', 'duration': 5.02732992172, 'sample_width': 2.00278016}, 'antennas': {'set': 'HBA_DUAL', 'fields': [{'type': 'HBA', 'field': 'HBA0', 'station': 'CS001'}, {'type': 'HBA', 'field': 'HBA1', 'station': 'CS001'}]}, 'target': {'pointing': {'angle1': 0, 'angle2': 0, 'direction_type': 'J2000'}, 'coherent': True}, 'samples': {'polarisations': ['XX', 'XY', 'YX', 'YY'], 'type': 'float', 'bits': 32, 'writer': 'standard', 'writer_version': '2.2.0', 'complex': True}, 'files': ['stokes/SAP0/CS003HBA1/L773569_SAP000_B005_S0_P000_bf.h5', 'stokes/SAP0/RS106HBA/L773569_SAP000_B046_S0_P000_bf.h5'], '$schema': 'http://127.0.0.1:8001/api/schemas/dataproductfeedbacktemplate/feedback/1#'}
            for dp in specifications_doc['stations']['digital_pointings']:
                dp['subbands'] = list(range(8))
            # Create SubTask(output)
            subtask_data = Subtask_test_data(subtask_template=subtask_template, specifications_doc=specifications_doc)
            subtask: models.Subtask = models.Subtask.objects.create(**subtask_data)
            subtask.task_blueprints.set([models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())])
            subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
            # Create Dataproduct
            dataproduct: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(feedback_doc=feedback_doc, producer=subtask_output,
                                                                                                        dataformat=models.Dataformat.objects.get(value="pulp analysis"),
                                                                                                        datatype=models.Datatype.objects.get(value="pulsar profile")))
    
            # Create SAP
            sap_template = models.SAPTemplate.objects.get(name="SAP")
            specifications_doc = get_default_json_object_for_schema(sap_template.schema)
            sap = models.SAP.objects.create(specifications_doc=specifications_doc, specifications_template=sap_template)
            sap.save()
    
            dataproduct.sap = sap
            dataproduct.save()
    
            # PULP ANALYSIS
    
            sip = generate_sip_for_dataproduct(dataproduct)
    
            # double-check that SIP contains values from feedback and specifications docs
            self.assertIn(str(feedback_doc['frequency']['channel_width']), sip.get_prettyxml())
            self.assertIn(constants.FILTERSELECTIONTYPE_110_190_MHZ, sip.get_prettyxml())
            for pol in feedback_doc['samples']['polarisations']:
                self.assertIn(str(pol), sip.get_prettyxml())
    
            self.assertIn(str(subtask.global_identifier.unique_identifier), sip.get_prettyxml())
            self.assertIn(str(dataproduct.global_identifier.unique_identifier), sip.get_prettyxml())
            self.assertIn(str(sap.global_identifier.unique_identifier), sip.get_prettyxml())
    
            # assert that a pulp analysis dataproduct in TMSS creates a PulpDataProduct in the SIP
            self.assertIn(str('<dataProduct xsi:type="sip:PulpDataProduct">'), sip.get_prettyxml())
    
            # assert beam type
            self.assertIn(str('FlysEyeBeam'), sip.get_prettyxml())
    
            # assert datatype
            self.assertIn(str('<dataType>CoherentStokes</dataType>'), sip.get_prettyxml())
    
            # assert fileformat
            self.assertIn(str('<fileFormat>PULP</fileFormat>'), sip.get_prettyxml())
    
            # alter dataproduct, recreate sip
            dataproduct.feedback_doc['target']['coherent'] = False
            dataproduct.save()
            sip = generate_sip_for_dataproduct(dataproduct)
    
            # assert datatype reflects change of coherent flag
            self.assertIn(str('<dataType>IncoherentStokes</dataType>'), sip.get_prettyxml())
    
            # PULP SUMMARY
    
            # alter dataproduct, recreate sip
            dataproduct.dataformat = models.Dataformat.objects.get(value="pulp summary")
            dataproduct.feedback_doc['$schema'] = 'http://127.0.0.1:8001/api/schemas/dataproductfeedbacktemplate/pulp summary/1#'
            dataproduct.save()
            sip = generate_sip_for_dataproduct(dataproduct)
    
            # assert datatype reflects change of dataformat
            self.assertIn(str('<dataType>SummaryIncoherentStokes</dataType>'), sip.get_prettyxml())
    
            # assert that a pulp summary dataproduct in TMSS creates a PulpSummaryDataProduct in the SIP
            self.assertIn(str('<dataProduct xsi:type="sip:PulpSummaryDataProduct">'), sip.get_prettyxml())
    
            # assert fileformat
            self.assertIn(str('<fileFormat>PULP</fileFormat>'), sip.get_prettyxml())
    
    
    class CycleReportTest(unittest.TestCase):
        @classmethod
        def setUpClass(cls) -> None:
            # Create requirements
            cls.cycle = models.Cycle.objects.create(**Cycle_test_data(start=datetime.utcnow().isoformat(), stop=(datetime.utcnow() + timedelta(weeks=12)).isoformat()))
            # Projects
            cls.project = models.Project.objects.create(**Project_test_data(name='unassigned'))
            cls.project.cycles.set([cls.cycle.pk])
    
            cls.project_regular = models.Project.objects.create(**Project_test_data(name='regular'))
            cls.project_regular.cycles.set([cls.cycle.pk])
            cls.project_regular.project_category = models.ProjectCategory.objects.get(value='regular')
            cls.project_regular.save()
    
            cls.project_user_shared_support = models.Project.objects.create(**Project_test_data(name='user_shared_support'))
            cls.project_user_shared_support.cycles.set([cls.cycle.pk])
            cls.project_user_shared_support.project_category = models.ProjectCategory.objects.get(value='user_shared_support')
            cls.project_user_shared_support.save()
    
            cls.project_com = models.Project.objects.create(**Project_test_data(name='commissioning'))
            cls.project_com.cycles.set([cls.cycle.pk])
            cls.project_com.project_category = models.ProjectCategory.objects.get(value='commissioning')
            cls.project_com.save()
    
            cls.project_ddt = models.Project.objects.create(**Project_test_data(name='ddt'))
            cls.project_ddt.cycles.set([cls.cycle.pk])
            cls.project_ddt.project_category = models.ProjectCategory.objects.get(value='ddt')
            cls.project_ddt.save()
    
            cls.project_test = models.Project.objects.create(**Project_test_data(name='test'))
            cls.project_test.cycles.set([cls.cycle.pk])
            cls.project_test.project_category = models.ProjectCategory.objects.get(value='test')
            cls.project_test.save()
    
            # SU, SUD and TD
            cls.projects_components = {}
            for p in (cls.project, cls.project_regular, cls.project_user_shared_support, cls.project_com, cls.project_ddt, cls.project_test):
                scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=cls.project))
                scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data(scheduling_set=scheduling_set))
                task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(scheduling_unit_draft=scheduling_unit_draft))
                cls.projects_components[f'{p.name}'] = {'scheduling_set': scheduling_set, 'scheduling_unit_draft': scheduling_unit_draft, 'task_draft': task_draft}
    
            # Create test_data_creator as superuser
            cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)
            response = requests.get(cls.test_data_creator.django_api_url + '/', auth=cls.test_data_creator.auth)
    
        def _create_subtask_with_type_and_set_status(self, type, status=None, project_name=None):
            """
            Help method to create a Subtask by specifying its type and (optionally) set the its status
            and (optionally) a project to belong to.
            """
            if not project_name:
                project_name = 'unassigned'
    
            sub = models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data(draft=self.projects_components[project_name]['scheduling_unit_draft']))
            tb = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=self.projects_components[project_name]['task_draft'], scheduling_unit_blueprint=sub))
            # Create Subtask
            subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data(subtask_type_value=type))
            subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=subtask_template))
            subtask.task_blueprints.set([tb])
    
            if status:
                set_subtask_state_following_allowed_transitions(subtask, status)
    
            return subtask
    
        def test_create_cycle_report(self):
            """
            Test create_cycle_report extra action.
            """
            # Create and set two Subtasks of type 'observation' and 'pipeline' with the state 'finished'.
            subtask_obs = self._create_subtask_with_type_and_set_status('observation', 'finished')
            subtask_pip = self._create_subtask_with_type_and_set_status('pipeline', 'finished')
    
            # Create SubtaskOutput and Dataproducts
            subtask_output_obs = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_obs))
            dp_interferometric_obs = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_obs, dataformat=models.Dataformat.objects.get(value="MeasurementSet"), datatype=models.Datatype.objects.get(value="visibilities")))
            dp_beamformed_obs = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_obs, dataformat=models.Dataformat.objects.get(value="Beamformed"), datatype=models.Datatype.objects.get(value="time series")))
            subtask_output_pip = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pip))
            dp_preprocessing_pip = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_pip, dataformat=models.Dataformat.objects.get(value="MeasurementSet"), datatype=models.Datatype.objects.get(value="visibilities")))
            dp_pulsar_pip1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_pip, dataformat=models.Dataformat.objects.get(value="pulp summary"), datatype=models.Datatype.objects.get(value="pulsar profile")))
            dp_pulsar_pip2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output_pip, dataformat=models.Dataformat.objects.get(value="pulp analysis"), datatype=models.Datatype.objects.get(value="time series")))
    
            # Create generic and 'stand-alone' reservations
            reservation_no_project = models.Reservation.objects.create(**Reservation_test_data(duration=300))
            reservation_mixed = models.Reservation.objects.create(**Reservation_test_data(duration=500, project=self.project))    # Non-production project
            reservation_project = models.Reservation.objects.create(**Reservation_test_data(duration=600, project=self.project_regular))  # Production project
    
            reservation_template = models.ReservationTemplate.objects.get(name="reservation")
            reservation_template_spec = get_default_json_object_for_schema(reservation_template.schema)
            reservation_template_spec['activity']['type'] = 'stand-alone mode'
            reservation_no_project_sa_mode = models.Reservation.objects.create(start_time=datetime.now(), stop_time=datetime.now()+timedelta(seconds=1200), name="SA no project", description="SA no project", specifications_template=reservation_template, specifications_doc=reservation_template_spec)
            reservation_mixed_sa_mode = models.Reservation.objects.create(start_time=datetime.now(), stop_time=datetime.now()+timedelta(seconds=350), project=self.project, name="SA mixed no project", description="SA mixed no project", specifications_template=reservation_template, specifications_doc=reservation_template_spec)
            reservation_project_sa_mode = models.Reservation.objects.create(start_time=datetime.now(), stop_time=datetime.now() + timedelta(seconds=800), project=self.project_regular, name="SA project", description="SA project", specifications_template=reservation_template, specifications_doc=reservation_template_spec)
    
            # Assertions
    
            # Assert we get the expected object
            response = requests.get(BASE_URL + '/cycle/%s/report' % self.cycle.pk, auth=self.test_data_creator.auth)
            self.assertEqual(response.status_code, 200)
            result = response.json()
    
            # Assert data_ingested_per_site_and_category
            data_per_site_and_cat = result['data_ingested_per_site_and_category']
            self.assertEqual(data_per_site_and_cat['Interferometric Observation']['size__sum'], 123)
            self.assertEqual(data_per_site_and_cat['Beamformed Observation']['size__sum'], 123)
            self.assertEqual(data_per_site_and_cat['Preprocessing Pipeline']['size__sum'], 123)
            self.assertEqual(data_per_site_and_cat['Pulsar Pipeline']['size__sum'], 246)
    
            # Assert usage_mode
            usage_mode = result['usage_mode']
            self.assertAlmostEqual(usage_mode['all modes']['total'], 3750, places=4)
            self.assertAlmostEqual(usage_mode['all modes']['observing'], 1400, places=4)
            self.assertAlmostEqual(usage_mode['all modes']['idle/test'], 850, places=4)
    
            self.assertAlmostEqual(usage_mode['stand-alone mode']['total'], 2350, places=4)
            self.assertAlmostEqual(usage_mode['stand-alone mode']['no project'], 1200, places=4)
            self.assertAlmostEqual(usage_mode['stand-alone mode']['project'], 800, places=4)
            self.assertAlmostEqual(usage_mode['stand-alone mode']['mixed/no project'], 350, places=4)
    
            self.assertAlmostEqual(usage_mode['ILT mode']['total'], 1400, places=4)
            self.assertAlmostEqual(usage_mode['ILT mode']['observing'], 600, places=4)
            self.assertAlmostEqual(usage_mode['ILT mode']['idle/test'], 500, places=4)
    
    
    class ProjectReportTest(unittest.TestCase):
        @classmethod
        def setUpClass(cls) -> None:
            # Create requirements
            cls.project = models.Project.objects.create(**Project_test_data(name='test_for_report'))
            cls.project_quota = models.ProjectQuota.objects.create(
                **ProjectQuota_test_data(project=cls.project, resource_type=models.ResourceType.objects.create(
                    **ResourceType_test_data(quantity=models.Quantity.objects.get(value=models.Quantity.Choices.NUMBER.value)))))
            cls.scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=cls.project))
            cls.scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data(scheduling_set=cls.scheduling_set))
            cls.task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(scheduling_unit_draft=cls.scheduling_unit_draft))
    
            # Create test_data_creator as superuser
            cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)
            response = requests.get(cls.test_data_creator.django_api_url + '/', auth=cls.test_data_creator.auth)
    
        def _get_SUB_with_subtask_and_set_status(self, status=None):
            """
            Help method to create SUB, TaskBlueprint, Subtask and (optionally) set the latter's status.
            """
            sub = models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data(draft=self.scheduling_unit_draft))
            tb = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=self.task_draft, scheduling_unit_blueprint=sub))
            # Create Subtask of type 'ingest'
            subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data(subtask_type_value='ingest'))
            subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=subtask_template))
            subtask.task_blueprints.set([tb])
    
            if status:
                set_subtask_state_following_allowed_transitions(subtask, status)
    
            return sub, tb, subtask
    
        def test_create_project_report(self):
            """
            Test create project extra action.
            """
            # Create four SUBs and respectively set their states to: 'finished' x2 (so we can create dataproducts, compare
            # their sizes, and have a successful and a failed SUBs), 'cancelled' and 'defined' (which means not cancelled).
            succeeded_sub, _, succeeded_subtask = self._get_SUB_with_subtask_and_set_status('finished')
            failed_sub, _, failed_subtask = self._get_SUB_with_subtask_and_set_status('finished')
            cancelled_sub, _, cancelled_subtask = self._get_SUB_with_subtask_and_set_status('cancelled')
            not_cancelled_sub, _, not_cancelled_subtask = self._get_SUB_with_subtask_and_set_status('defined')
            # Set workflow flags so we have a successful and a failed SUBs
            SchedulingUnitProcess.objects.create(su=succeeded_sub, results_accepted=True)
            SchedulingUnitProcess.objects.create(su=failed_sub, results_accepted=False)
    
            # Create SubtaskOutput and Dataproducts from subtask_output
            subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=succeeded_subtask))
            dataproduct1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
            dataproduct2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
    
            # Assert we get the expected object
            response = requests.get(BASE_URL + '/project/%s/report' % self.project.pk, auth=self.test_data_creator.auth)
            self.assertEqual(response.status_code, 200)
            result = response.json()
    
            # Assert Project and ProjectQuota ids
            self.assertEqual(result['project'], self.project.pk)
            self.assertEqual(result['quota'][0]['id'], self.project_quota.pk)
    
            # Assert durations are well calculated
            # The four SUBs (successful, failed, cancelled and not cancelled) have a duration/observed_duration of 600s each
            self.assertAlmostEqual(result['durations']['total'], 2400, places=4)
            self.assertAlmostEqual(result['durations']['total_not_cancelled'], 1800, places=4)
            self.assertAlmostEqual(result['durations']['total_succeeded'], 600, places=4)
            self.assertAlmostEqual(result['durations']['total_succeeded_A'], 600, places=4)  # The succeeded SUB has prio A
            self.assertAlmostEqual(result['durations']['total_succeeded_B'], 0, places=4)
            self.assertAlmostEqual(result['durations']['total_failed'], 600, places=4)
            self.assertAlmostEqual(result['durations']['total_observed'], 1200, places=4)
            self.assertAlmostEqual(result['durations']['total_observed_succeeded'], 600, places=4)
            self.assertAlmostEqual(result['durations']['total_observed_succeeded_A'], 600, places=4)
            self.assertAlmostEqual(result['durations']['total_observed_succeeded_B'], 0, places=4)
            self.assertAlmostEqual(result['durations']['total_observed_failed'], 600, places=4)
            # Assert percentages
            self.assertAlmostEqual(result['durations']['not_cancelled_perc'], 0.75, places=2)
            self.assertAlmostEqual(result['durations']['succeeded_perc'], 0.25, places=2)
            self.assertAlmostEqual(result['durations']['failed_perc'], 0.25, places=2)
            self.assertAlmostEqual(result['durations']['observed_perc'], 0.50, places=2)
            self.assertAlmostEqual(result['durations']['observed_succeeded_perc'], 0.25, places=2)
            self.assertAlmostEqual(result['durations']['observed_failed_perc'], 0.25, places=2)
    
            # There is only one successful SUB
            self.assertEqual(result['SUBs']['successful'][0]['id'], succeeded_sub.pk)
            self.assertEqual(result['SUBs']['successful'][0]['status'], 'finished')
            self.assertEqual(result['SUBs']['successful'][0]['ingested_data_size'], 246)
            # There is only one failed SUB
            self.assertEqual(result['SUBs']['failed'][0]['id'], failed_sub.pk)
            self.assertEqual(result['SUBs']['failed'][0]['status'], 'finished')
            self.assertIsNone(result['SUBs']['failed'][0]['ingested_data_size'])
    
            # There are just two dataproducts, each has a size of 123
            self.assertEqual(result['LTA dataproducts']['size__sum'], 246)
            # Just to check if the placeholder was added
            self.assertIsNotNone(result['SAPs exposure'])   # TODO: Implement test properly.
    
    
    if __name__ == "__main__":
        os.environ['TZ'] = 'UTC'
        unittest.main()