diff --git a/SAS/TMSS/src/tmss/tmssapp/adapters/sip.py b/SAS/TMSS/src/tmss/tmssapp/adapters/sip.py index 3f64f709a0f7bc81b10f96220f63cc9abdb50f85..50e2a205555195be071027cdc0894164c2726335 100644 --- a/SAS/TMSS/src/tmss/tmssapp/adapters/sip.py +++ b/SAS/TMSS/src/tmss/tmssapp/adapters/sip.py @@ -1,5 +1,5 @@ from lofar.sas.tmss.tmss.exceptions import * -from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Dataproduct, SubtaskType, Subtask +from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Dataproduct, SubtaskType, Subtask, SubtaskOutput from lofar.sas.tmss.tmss.tmssapp.models.specification import Datatype, Dataformat from lofar.lta.sip import siplib, ltasip, validator, constants @@ -8,6 +8,59 @@ import logging import isodate logger = logging.getLogger(__name__) +mapping_antennaset_type_TMSS_2_SIP = { + "HBA_DUAL": constants.ANTENNASETTYPE_HBA_DUAL, + "HBA_DUAL_INNER": constants.ANTENNASETTYPE_HBA_DUAL_INNER, + "HBA_JOINED": constants.ANTENNASETTYPE_HBA_JOINED, + "HBA_JOINED_INNER": constants.ANTENNASETTYPE_HBA_JOINED_INNER, + "HBA_ONE": constants.ANTENNASETTYPE_HBA_ONE, + "HBA_ONE_INNER": constants.ANTENNASETTYPE_HBA_ONE_INNER, + "HBA_ZERO": constants.ANTENNASETTYPE_HBA_ZERO, + "HBA_ZERO_INNER": constants.ANTENNASETTYPE_HBA_ZERO_INNER, + "LBA_INNER": constants.ANTENNASETTYPE_LBA_INNER, + "LBA_OUTER": constants.ANTENNASETTYPE_LBA_OUTER, + "LBA_SPARSE_EVEN": constants.ANTENNASETTYPE_LBA_SPARSE_EVEN, + "LBA_SPARSE_ODD": constants.ANTENNASETTYPE_LBA_SPARSE_ODD, + "LBA_ALL": "NOT SUPPORTED IN LTA YET" +} + +mapping_filterset_type_TMSS_2_SIP = { + "LBA_10_90": constants.FILTERSELECTIONTYPE_10_90_MHZ, + "LBA_30_90": constants.FILTERSELECTIONTYPE_30_90_MHZ, + "HBA_110_190": constants.FILTERSELECTIONTYPE_110_190_MHZ, + "HBA_210_250": constants.FILTERSELECTIONTYPE_210_250_MHZ +} + +def get_number_of_dataproducts_of_type(subtask, dataproduct_datatype): + """ + Retrieve the number of dataproducts of given data type in subtask: + :param subtask: + :param dataproduct_datatype: Datatype like "visibilities", "time series", "instrument model", "image", "quality" + :return: + """ + nbr_dataproduct = 0 + subtask_outputs = list(SubtaskOutput.objects.filter(subtask_id=subtask.id)) + for subtask_output in subtask_outputs: + dataproducts = list(Dataproduct.objects.filter(producer_id=subtask_output.id, + dataformat=dataproduct_datatype)) + nbr_dataproduct += len(dataproducts) + return nbr_dataproduct + + +def get_siplib_stations_list(subtask): + """ + Retrieve a list of siplib Stations objects which is extracted from the station_list and the antennaset + TODO Correct mapping of all different HBA/LBA antennaset flavours to antenna fieldtypes required for SIP + :param subtask: + :return: + """ + siplib_station_list =[] + list_stations = subtask.specifications_doc['stations']['station_list'] + antennafieldtypes = ["HBA"] if "HBA" in subtask.specifications_doc['stations']['antenna_set'] else ["LBA"] + for station in list_stations: + siplib_station_list.append(siplib.Station.preconfigured(station, antennafieldtypes)) + return siplib_station_list + # todo: how do we handle IDs? ...use ID service for new dataproducts? ...determine IDs of already ingested dataproducts? # todo: replace the following hack after a conclusion is reached on how to handle IDs. @@ -66,7 +119,7 @@ def create_sip_representation_for_subtask(subtask: Subtask): name = str(subtask.id) process_map = siplib.ProcessMap(strategyname=subtask.specifications_template.name, strategydescription=subtask.specifications_template.description, - starttime=subtask.start_time.isoformat(), + starttime=subtask.start_time, duration=isodate.duration_isoformat(subtask.stop_time-subtask.start_time), identifier=subtask_sip_identifier, observation_identifier=subtask_sip_identifier, @@ -74,28 +127,29 @@ def create_sip_representation_for_subtask(subtask: Subtask): # determine subtask specific properties and add subtask representation to Sip object if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: - observation = siplib.Observation(observingmode="Beam Observation", # can be hardcoded - instrumentfilter="110-190 MHz", # todo: from subtask.specifications_doc, needs conversion - clock_frequency="200", # fixed, - clock_frequencyunit="MHz", # fixed, consider Hz? - stationselection="Custom", # fixed - antennaset="HBA Dual", # from subtask.specifications_doc - timesystem="UTC", # fixed - numberofstations=2, # todo: len(subtask.specifications_doc['stations']['station_list']) - stations=[siplib.Station.preconfigured("RS205", ["HBA"]), siplib.Station.preconfigured("RS210", ["HBA"])], # todo: subtask.specifications_doc['stations']['station_list', antennafield_list needs to be derived from antennaset + stations type, JD can help - numberofsubarraypointings=0, # todo: subtask.specifications_doc - numberoftbbevents=0, # fixed - numberofcorrelateddataproducts=0, # todo: iterate subtask.outputs - numberofbeamformeddataproducts=0, # todo: iterate subtask.outputs - numberofbitspersample=8, # fixed + observation = siplib.Observation(observingmode=constants.OBSERVINGMODETYPE_BEAM_OBSERVATION, # can be hardcoded for an observation + instrumentfilter=mapping_filterset_type_TMSS_2_SIP[subtask.specifications_doc['stations']['filter']], + clock_frequency="200", # fixed, + clock_frequencyunit=constants.FREQUENCYUNIT_MHZ, # fixed, consider Hz? + stationselection=constants.STATIONSELECTIONTYPE_CUSTOM, # fixed + antennaset=mapping_antennaset_type_TMSS_2_SIP[subtask.specifications_doc['stations']['antenna_set']], + timesystem="UTC", # fixed + numberofstations=len(subtask.specifications_doc['stations']['station_list']), + stations=get_siplib_stations_list(subtask), + numberofsubarraypointings=len(subtask.specifications_doc['stations']['digital_pointings']), + numberoftbbevents=0, # fixed + numberofcorrelateddataproducts=get_number_of_dataproducts_of_type(subtask, Dataformat.Choices.MEASUREMENTSET.value), + numberofbeamformeddataproducts=get_number_of_dataproducts_of_type(subtask, Dataformat.Choices.BEAMFORMED.value), + numberofbitspersample=8, # fixed process_map=process_map, - channelwidth_frequency=None, # todo, subtask.specifications_doc - channelwidth_frequencyunit="Hz", # fixed + channelwidth_frequency=None, # NA any more ('BlueGene compatibility' see comment in LTA-SIP.xsd) + channelwidth_frequencyunit=constants.FREQUENCYUNIT_HZ, # fixed observationdescription=subtask.task_blueprint.description, - channelspersubband=None, # todo, subtask.specifications_doc - subarraypointings=None, # todo, subtask.specifications_doc, probably more complex than it looks + channelspersubband=0, # NA any more ('BlueGene compatibility' see comment in LTA-SIP.xsd) + subarraypointings=None, # todo, subtask.specifications_doc, probably more complex than it looks -> RGOE yes complex type for later transientbufferboardevents=None # fixed ) + return observation elif subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: sourcedata_identifiers = [] @@ -110,14 +164,15 @@ def create_sip_representation_for_subtask(subtask: Subtask): sourcedata_identifiers=sourcedata_identifiers, process_map=process_map) - if subtask.specifications_template.name == "pipelinecontrol schema": # todo: re-evaluate this, it is not "preprocessing schema" as we thought... + if subtask.specifications_template.name == "pipelinecontrol schema": # todo: re-evaluate this because schema name might change pipeline = siplib.AveragingPipeline( # <-- this is what we need for UC1 pipeline_map, - numberofcorrelateddataproducts = 1, # todo, walk subtask.outputs - frequencyintegrationstep = 1, #todo, # todo, subtask.specifications_doc - timeintegrationstep = 1, #todo, subtask.specifications_doc - flagautocorrelations = True, #todo, subtask.specifications_doc - demixing = False) #todo, subtask.specifications_doc + numberofcorrelateddataproducts=get_number_of_dataproducts_of_type(subtask, Dataformat.Choices.MEASUREMENTSET.value), + frequencyintegrationstep=subtask.specifications_doc['demixer']['frequency_steps'] if 'demix' in subtask.task_blueprint.specifications_doc else 0, + timeintegrationstep=subtask.specifications_doc['demixer']['time_step'] if 'demix' in subtask.task_blueprint.specifications_doc else 0, + flagautocorrelations=subtask.task_blueprint.specifications_doc["flag"]["autocorrelations"], + demixing=True if 'demix' in subtask.task_blueprint.specifications_doc else False + ) # todo: distinguish and create other pipeline types. Probably most of these can be filled in over time as needed, # but they are not required for UC1. Here are stubs to start from for the other types the LTA supports: # elif subtask.specifications_template.name == "<???>": # todo @@ -158,7 +213,8 @@ def create_sip_representation_for_subtask(subtask: Subtask): # skipdynamicspectrum=False, #todo # skipprefold=True) #todo else: - logger.warning('Could not identify LTA pipeline flavor for subtask=%s, template name=%s). Adding as SimplePipeline to SIP.' % (subtask, subtask.specifications_template.name)) + logger.warning('Could not identify LTA pipeline flavor for subtask=%s, template name=%s). Adding as SimplePipeline to SIP.' + % (subtask, subtask.specifications_template.name)) pipeline = siplib.SimplePipeline(pipeline_map) return pipeline else: @@ -213,6 +269,7 @@ def create_sip_representation_for_dataproduct(dataproduct: Dataproduct): storage_writer_version='Unknown', # todo: not modeled? needs to come from feedback eventually. process_identifier=create_fake_identifier_for_testing(unique_id=dataproduct.producer.subtask.id)) + # next TODOs: TMSS-300 if dataproduct.dataformat.value == Dataformat.Choices.MEASUREMENTSET.value: # <- This is the only one we currently need for UC1 sip_dataproduct = siplib.CorrelatedDataProduct( dataproduct_map, diff --git a/SAS/TMSS/test/test_utils.py b/SAS/TMSS/test/test_utils.py index 5f515a7c97a09a2e276570b177517c24b25ea49c..c5ede9d9882ab729ed96311b4fe130ddfa4c766e 100644 --- a/SAS/TMSS/test/test_utils.py +++ b/SAS/TMSS/test/test_utils.py @@ -21,6 +21,7 @@ import os import time +import datetime from multiprocessing import Process, Event import django @@ -53,9 +54,13 @@ def assertDataWithUrls(self, data, expected): err_msg = "The value '%s' (key is %s) is not in expected %s" % (str(v), str(data[k]), k) self.assertTrue(str(v) in data[k], err_msg) + elif isinstance(v, datetime.datetime): + # URL (data[k]) is string but the test_data object (v) is datetime format, convert latter to string format to compare + self.assertEqual(v.isoformat(), data[k]) else: self.assertEqual(v, data[k]) + def assertUrlList(self, url_list, expected_objects): """ object instances get returned as urls, check that the expected projects are in that list diff --git a/SAS/TMSS/test/tmss_test_data_django_models.py b/SAS/TMSS/test/tmss_test_data_django_models.py index f0de0f4042b50353cc90d762e1c13f33263aba4e..7b8b38a389c324c5b4ba9519f8135d56c22ec541 100644 --- a/SAS/TMSS/test/tmss_test_data_django_models.py +++ b/SAS/TMSS/test/tmss_test_data_django_models.py @@ -30,7 +30,7 @@ which is automatically destroyed at the end of the unittest session. from lofar.sas.tmss.tmss.tmssapp import models from lofar.common.json_utils import get_default_json_object_for_schema -from datetime import datetime +from datetime import datetime, timedelta import uuid import json @@ -331,7 +331,7 @@ def Subtask_test_data(task_blueprint: models.TaskBlueprint=None, subtask_templat start_time = datetime.utcnow() if stop_time is None: - stop_time = datetime.utcnow() + stop_time = datetime.utcnow() + timedelta(minutes=10) if cluster is None: cluster = models.Cluster.objects.create(name="dummy cluster", location="downstairs", tags=[]) diff --git a/SAS/TMSS/test/tmss_test_environment_unittest_setup.py b/SAS/TMSS/test/tmss_test_environment_unittest_setup.py index 98375bb80e3b66b19320ef3c129d4757f1bbc7b6..419101c8696849ffeafd6d0c7403e2fc960abe7b 100644 --- a/SAS/TMSS/test/tmss_test_environment_unittest_setup.py +++ b/SAS/TMSS/test/tmss_test_environment_unittest_setup.py @@ -46,6 +46,7 @@ def tearDownModule(): import json import requests +import datetime AUTH = requests.auth.HTTPBasicAuth(tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password) BASE_URL = tmss_test_env.django_server.url OIDC_URL = tmss_test_env.django_server.oidc_url @@ -91,7 +92,11 @@ def _call_API_and_assert_expected_response(test_instance, url, call, data, expec test_instance.assertTrue(str(value) in r_dict[key]) elif type(value) is list: test_instance.assertEqual(sorted(value), sorted(r_dict[key]), msg="lists differ for key=%s"%key) # compare lists independent of ordering + elif isinstance(value, datetime.datetime): + # URL (r_dict[key]) is string but the test_data object (value) is datetime format, convert latter to string format to compare + test_instance.assertEqual(value.isoformat(), r_dict[key]) else: + test_instance.assertEqual(value, r_dict[key]) return r_dict