diff --git a/MAC/TBB/lib/tbb_util.py b/MAC/TBB/lib/tbb_util.py index e00929d9390cdc6a6947fc8c585776c11c1ebf1b..3756d9f3b62f5cb8ef75fd84980d046a09dc0eb1 100644 --- a/MAC/TBB/lib/tbb_util.py +++ b/MAC/TBB/lib/tbb_util.py @@ -24,6 +24,7 @@ def split_stations_by_boardnumber(stations): logging.debug("Board counts: %s" % stationslists) return stationslists + def expand_range(input_range): """ Take an input string that describes an integer range "7-29" and return the individual numbers in that range as strings in a list ["7", "8", ..., "29"]. @@ -55,6 +56,7 @@ def expand_range(input_range): items.append(str(element)) return items + def expand_list(input_list): """ Convert an input string of the form "1,17-33,64,1299-1344" to a list of strings that consists of each individual integer that is given and of all integers in every range. @@ -72,6 +74,7 @@ def expand_list(input_list): logging.debug("Input list: \"%s\"\nFound these elements in the list: \"%s\"" % (input_list, items)) return items + def calculate_adjusted_start_time(dm, start_time, sub_band): """ Calculate an adjusted start time for data in a sub-band in TBB memory. @@ -117,8 +120,9 @@ def calculate_adjusted_start_time(dm, start_time, sub_band): adjusted_start_time, full_seconds, slice_nr)) return (full_seconds, slice_nr) + # Todo: Consider to move this elsewhere, once we know where this parsing is supposed to happen eventually -def parse_parset_from_voevent(self, voevent): +def parse_parset_from_voevent(voevent): """ Parse the voevent for required values and create a parset from those. :param voevent: the voevent as xml string # see https://export.arxiv.org/pdf/1710.08155 @@ -132,13 +136,10 @@ def parse_parset_from_voevent(self, voevent): dm_value = dm_param.attrib['value'] trigger_version = root.attrib['version'] trigger_id = root.attrib['ivorn'] # todo: clarify: how does this relate to the RT TriggerID (in OTDB)? - time = root.find('./WhereWhen/ObsDataLocation/ObservationLocation/Time') + isotime = root.find('./WhereWhen/ObsDataLocation/ObservationLocation/AstroCoords/Time/TimeInstant/ISOTime').text coordsys = root.find('./WhereWhen/ObsDataLocation/ObservationLocation/AstroCoordSystem').attrib['id'] - - # todo: Check why the paper on arxiv differs from the example generated by Liam and which one is correct! ra = root.find('./WhereWhen/ObsDataLocation/ObservationLocation/AstroCoords/Position2D/Value2/C1').text # I bet there is a funny story about this structure... wtf?! dec = root.find('./WhereWhen/ObsDataLocation/ObservationLocation/AstroCoords/Position2D/Value2/C2').text - freq = root.find("./What/Group[@name='observatory parameters']/Param[@name='centre_frequency']").attrib['value'] # Todo: Parse all relevant information @@ -148,7 +149,7 @@ def parse_parset_from_voevent(self, voevent): 'Observation.TBB.TBBsetting.triggerType': 'FRB_VO', # todo: determine from event to allow other types? 'Observation.TBB.TBBsetting.triggerVersion': trigger_version, 'Observation.TBB.TBBsetting.triggerId': trigger_id, - 'Observation.TBB.TBBsetting.time': time, + 'Observation.TBB.TBBsetting.time': isotime, 'Observation.TBB.TBBsetting.fitDirectionCoordinateSystem': coordsys, 'Observation.TBB.TBBsetting.fitDirectionAngle1': ra, 'Observation.TBB.TBBsetting.fitDirectionAngle2': dec, @@ -160,4 +161,6 @@ def parse_parset_from_voevent(self, voevent): except Exception as ex: msg = 'Error while parsing VOEvent' logging.exception(msg) - raise KeyError("%s: %s" % (msg, ex)) \ No newline at end of file + raise KeyError("%s: %s" % (msg, ex)) + + diff --git a/SAS/TriggerServices/lib/trigger_service.py b/SAS/TriggerServices/lib/trigger_service.py index 5e4ba7616fe5f6ccc54a9b7812b7515d942eeadb..15b6c359fc9c887af4907a447c8299b7fe090f0a 100644 --- a/SAS/TriggerServices/lib/trigger_service.py +++ b/SAS/TriggerServices/lib/trigger_service.py @@ -43,6 +43,10 @@ from config import MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME, \ from voevent_listener import VOEventListenerInterface from voevent_decider import ALERTDecider +from lofar.mac.tbb.tbb_util import parse_parset_from_voevent + +import dateutil.parser +import time import logging logger = logging.getLogger(__name__) @@ -179,8 +183,13 @@ def _observation_is_running_at(timestamp): # todo! # No idea how to do that without actually asking anyone? # Assume an RT triggered obs is actually happening and keep track of that? + + logger.warning("The check for a running observation at the given time is not yet implemented. Assuming True.") + # I interpret Sanders comment the way that the initial decision can be solely based on correct TBB mode, so I guess this is fine. return True + + def _get_current_tbbtype(): """ :return: string with current tbb operation mode @@ -188,7 +197,12 @@ def _get_current_tbbtype(): # todo! # No idea how to do that without actually asking anyone? # Have a thread permanently requesting updates on these things to keep the current state updated? - return True + + # This is probably a must-have for the initial implementation, so not making any assumptions here. + # I think Sander wants to be able to poll for this, so we should add a function to the tbb service, and then call + # that service from here, I guess. + return 'subband' + #raise NotImplementedError def _get_stations_recording_at(timestamp): @@ -198,24 +212,78 @@ def _get_stations_recording_at(timestamp): """ # todo! # No idea how to do that without actually asking anyone? - return [] + stations = ['today_nl'] + logger.warning("The check for a stations in use at the given time is not yet implemented. Assuming these: %s" % stations) + # I interpret Sanders comment the way that the initial decision can be solely based on correct TBB mode, so I guess this is fine. + return stations -def _tbb_dump_is_acceptable(project, stoptime, tbbtype, stations): + +def _tbb_trigger_is_acceptable(project, stoptime, tbbtype, stations): """ + Perform permission and system checks to determine whether we can actually perform a triggered TBB dump for the given project. + :param project: project code as string :param stoptime: isostring with freeze date :param tbbtype: string with current tbb operation mode - :return: + :return: True if acceptable, else False """ # todo: add some caching to these! - allowed = _auth_allows_triggers(project) and _quota_allows_triggers(project) - if allowed: - possible = _observation_is_running_at(stoptime) and (_get_current_tbbtype() == tbbtype) \ - and _get_stations_recording_at(stoptime) + if _auth_allows_triggers(project): + logger.info('TBB Trigger is authorized') + if _quota_allows_triggers(project): + logger.info('Trigger quota allows TBB dump') + if _get_current_tbbtype() == tbbtype: # todo: move to do_tbb_subband_dump in order to switch firmware when needed? + logger.info('TBB operation mode is %s' % tbbtype) + if _observation_is_running_at(stoptime): + logger.info('Observation is running at time %s' % stoptime) + if not set(stations).isdisjoint(_get_stations_recording_at(stoptime)): # todo: move to do_tbb_subband_dump since there is just no way that we have a wrong list here, and we probably should not even determine this list here in the first place? + logger.info('Requested stations are part of a running observation') + return True + else: + msg = 'The requested stations %s are not observing at %s' % (stations, stoptime) + logger.error(msg) + raise Exception(msg) + else: + msg = 'Project %s is not allowed to trigger' % project + logger.error(msg) + raise Exception(msg) + else: + msg = "TBB operation mode is not '%s'." % tbbtype + logger.error(msg) + raise Exception(msg) + else: + msg = 'Trigger quota of project %s is exceeded' % project + logger.error(msg) + raise Exception(msg) + else: + msg = 'Project %s is not allowed to trigger' % project + logger.error(msg) + raise Exception(msg) + -def do_TBB_subband_dump(starttime, duration, dm, project, triggerid, - stoptime=None, stations=None, subbands=None, rcus=None, boards=None, tbbtype=None, voevent_xml=None): +def translate_arrival_time_to_frequency(reference_time, reference_frequency, dm, target_frequency=200): + """ + For a signal with specified dm, which has arrival time original_time at original_frequency, determine the arrival + time at target_frequency. + + :param original_time: timestamp in seconds since epoch as float + :param original_frequency: frequency in Mhz as integer + :param target_frequency: frequency in Mhz as integer + :param dm: dispersion measure in pc/cm^3 as integer + :return: arrival time of the original signal at target frequency as float + """ + dm_constant = 4.148808e3 # MHz^2 pc^-1 cm^3 sec + delay = dm * dm_constant * (1.0/target_frequency**2 - 1.0/reference_frequency**2) # sec + target_time = reference_time + delay + logger.info('Delay is %s seconds. Arrival times: %s @ %s Mhz -> %s @ %s Mhz' % (delay, reference_time, reference_frequency, target_time, target_frequency)) + return target_time + + +# todo: If the Qpid communication does not hurt, maybe it makes more sense to move this to the TBB Service? +def do_tbb_subband_dump(starttime, duration, dm, project, triggerid, + stoptime=None, stations=None, subbands=None, rcus=None, + boards=None, tbbtype=None, voevent_xml=None): """ This 'recipe' calls the tbb service to freeze data on the bords and perform the tbbdump. @@ -234,35 +302,40 @@ def do_TBB_subband_dump(starttime, duration, dm, project, triggerid, :param voevent_xml: """ # todo: The ticket says there should be an optional tbbtype argument. How is this supposed to work? - # todo: Either we do this specifically for subband mode or this cannot be optional...? + # todo: ...Either we do this specifically for subband mode or this cannot be optional...? raise NotImplementedError - # todo in SW-560 -- Note that many of these things were commented by Sander in SW-559 - # todo: Calculate delay from DM at LOFAR frequency -> clarify if really required here, since we pass dm to the freeze command already. - # Delays are calculated using the reference frequency [in GHz], and reference time from the VOevent to the stop time (center FRB time, delayed to 200 MHz + 2.5 seconds [half buffer time]). - # - # obtain DM, reference frequency and FRBtime from VOevent - # DMconstant=4.148808e-3 # seconds - # v_lofar=0.2 # GHz = 200 MHz - # buffer_time=5 - # post_FRB_time=buffer_time/2 - # delay= DM * DMconstant * (1/v_lofar^2 - 1/v_ref^2 ) + post_FRB_time - # stoptime=FRBtime+delay - # starttime=FRBtime+delay - buffer_time - # todo SW-560: Craft and send a freeze message directly using the python wrapper modules around tbbctl to freeze the appropriate TBB boards. - # -------------------------- + # todo SW-560: Note that many of these things were commented by Sander in SW-559, I copied most of it where it should go in the code + # todo SW-560: Craft and send a freeze message directly using the python wrapper module around tbbctl to freeze the appropriate TBB boards. + # Note: We should use the tbb service for that (maybe it is faster to first trigger the freeze directly with the wrapper script? + # If we move this to the TBB service, we should call the according functions directly, instead, but the rpc can be used from the outside: + # e.g. + # from lofar.mac.tbbservice.client.tbbservice_rpc import TBBRPC + # with TBBRPC() as rpc: + # import time + # from lofar.mac.tbb.tbb_util import expand_list + # logger.info(rpc.start_datawriters()) + # logger.info(rpc.switch_firmware("de601c", "subband")) + # logger.info(rpc.set_storage({"de601c": "somecepnode"})) + # logger.info(rpc.start_recording("de601c", "subband", "1:48")) + # logger.info(rpc.restart_recording("de601c", "subband", "1:48")) + # sec, nsec = ("%.9f" % time.time()).split(".") + # sec = int(sec) + # nsec = int(nsec) + # logger.info(rpc.freeze_data("de601c", 4.2, sec, nsec)) + # logger.info(rpc.upload_data("de601c", 4.2, time.time(), 1, "1:48", 1, expand_list("0,2-3,5"))) + # logger.info(rpc.release_data("de601c")) class ALERTHandler(VOEventListenerInterface): """ - This class implements the VOEventListenerInterface in order to receive VO events. - It handles FRB events for the ALERT project. + This class implements the VOEventListenerInterface in order to receive VO events for the ALERT project. """ - project = "ALERT" # todo: set correct project code tbbtype = "subband" - stations = [] # todo SW-560! + stations = ['today_nl'] # todo SW-560! The ticket implies that this should be set to all dutch stations used in observation (we could filter _get_stations_recording_at() response). But why then check later that the requested stations are actually part of the observation? + # Note that the TBB service currently expects the LCU names (trailing 'c') in a comma-separated string def handle_event(self, voevent_xml, voevent_etree): @@ -271,22 +344,38 @@ class ALERTHandler(VOEventListenerInterface): logger.info('%s' % voevent_xml) # check if trigger is allowed to be accepted(marshal permissions etc,) - stoptime = None # todo! clarify: is this stoptime actually the same stoptime as the one for do_TBB_subband_dump? - if _tbb_dump_is_acceptable(self.project, stoptime, self.tbbtype, self.stations): + # Note: So we first have to interpret the event at this stage already just to get the actual time when the + # freeze should happen so we can check if the event should be accepted...? + # Meh. I'd prefer this to happen in the recipe, so that we can separate the marshaling from the handling. + # Anyway, here we go: + parset = parse_parset_from_voevent(voevent_xml) + dm = float(parset['Observation.TBB.TBBsetting.triggerDispersionMeasure']) + triggerid = parset['Observation.TBB.TBBsetting.triggerId'] + reference_frequency = float(parset['Observation.TBB.TBBsetting.referenceFrequency']) + timestr = parset['Observation.TBB.TBBsetting.time'] + reference_time = time.mktime(dateutil.parser.parse(timestr).timetuple()) # convert time string to seconds + centertime = translate_arrival_time_to_frequency(reference_time, reference_frequency, dm, target_frequency=200) + duration = 5 # tbb can hold this many seconds + starttime = centertime - duration / 2.0 + stoptime = centertime - duration / 2.0 + + if _tbb_trigger_is_acceptable(self.project, stoptime, self.tbbtype, self.stations): + logger.info('System pre-flight checks passed') # check if trigger is acceptable for PI decider = ALERTDecider() if decider.is_acceptable(voevent_etree): + logger.info('Science pre-flight checks passed') - # todo: use MAC/TBB/lib/tbb_til.py -> parse_parset_from_voevent() to get these things. - starttime = None - duration = None - dm = None - triggerid = None - do_TBB_subband_dump(starttime, duration, dm, self.project, triggerid) + logger.info('Initiating TBB dump for trigger id %s, starttime %s, duration %s, dm %s' % (triggerid, starttime, duration, dm)) + do_tbb_subband_dump(starttime, duration, dm, self.project, triggerid, stoptime=stoptime, stations=self.stations) + else: + raise Exception('Science pre-flight checks rejected the trigger!') + else: + raise Exception('System pre-flight checks rejected the trigger!') except Exception as ex: - logger.exception("An error occurred whyle handling the event: %s" % ex) + logger.exception("An error occurred while handling the event: %s" % ex) logger.info('...done handling ALERT event.') @@ -300,9 +389,10 @@ def create_service(servicename=TRIGGER_SERVICENAME, busname=TRIGGER_BUSNAME): def main(): - with create_service() as s: + with create_service(): # handle vo events - arts_handler = ALERTHandler() # todo: configure broker host/port and Package_Type integer as provided by ARTS - arts_handler.start_listening() # todo: start in own thread? + # todo: configure broker host/port and filter_for Package_Type integer as provided by ARTS + arts_handler = ALERTHandler(broker_host='127.0.0.1', broker_port=8099, filter_for=None) + arts_handler.start_listening() waitForInterrupt() diff --git a/SAS/TriggerServices/lib/voevent_decider.py b/SAS/TriggerServices/lib/voevent_decider.py index 9d7cb8b9089e47c4fed49e98440e93c2815ed854..6e067f6846b0850c8b9d9190364b6ce99d8b3d8c 100644 --- a/SAS/TriggerServices/lib/voevent_decider.py +++ b/SAS/TriggerServices/lib/voevent_decider.py @@ -20,6 +20,11 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +""" +This module contains the logic to decide whether it makes scientifically sense to accept a certain vo event. +""" + import logging import datetime from lxml import etree @@ -27,26 +32,30 @@ logger = logging.getLogger(__name__) class DeciderInterface: """ - A simple interface that should be implemented by a decider class to accept or reject events based on custom logic. + A simple interface that should be implemented by a decider class to accept or reject events. + Note: This is not the place to perform permission or system state checks, but rather things like e.g. whether an + event is visible or interesting enough to be observed. + + Example: + --- + class AcceptAllDecider(DeciderInterface): + def is_acceptable(self, voevent_root): + logger.info('Accepting all events.') + return True + --- """ def is_acceptable(self, voevent_root): """ - This should be overwritten by the PI, implementing the logic that decise whether the provided - VO event should be accepted or not + This should be overwritten by project-specific logic that decide whether the provided VO event should be + accepted or not. This logic should usually be provided by the PI. + :param voevent_root: the root node of the event as an lxml etree :return: True to accept the event, False to reject/drop it """ raise NotImplementedError -class AcceptAllDecider(DeciderInterface): - """ - A simple decider that accepts everything - """ - def is_acceptable(self, voevent_root): - return True - class ALERTDecider(DeciderInterface): """ @@ -54,6 +63,9 @@ class ALERTDecider(DeciderInterface): """ def is_acceptable(self, voevent_root): + + + logger.warning('Accepting event no matter what! There is nothing smart going on here yet.') # todo: Check if the event has the test-flag set. If the test-flag is set, do not send the freeze event but just log the event information. # todo: Stoptime is less than half an hour away. return True diff --git a/SAS/TriggerServices/lib/voevent_listener.py b/SAS/TriggerServices/lib/voevent_listener.py index a4903ceb811b3b27c4396301d5b5f58c8474da28..0e90e0c97e737fbcfba4975bca8a48094c60cf64 100644 --- a/SAS/TriggerServices/lib/voevent_listener.py +++ b/SAS/TriggerServices/lib/voevent_listener.py @@ -20,10 +20,15 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +""" +This module contains functionality to register at a VO event broker and implement handlers to act on arriving events. +""" + import logging import argparse import gcn.notice_types import time +import threading logger = logging.getLogger(__name__) @@ -54,13 +59,15 @@ def listen_for_voevents(handler_function, broker_host, broker_port, filter_for=N handler = gcn.include_notice_types(*filter_for)(handler_function) else: logger.info('Will listen for all Packet_Types') - print(broker_host, broker_port, handler_function) + + logger.info("Now starting to listen on %s:%s, passing events to %s" % (broker_host, broker_port, handler_function)) gcn.listen(host=broker_host, port=broker_port, handler=handler_function) class VOEventListenerInterface(object): """ - An interface that should be implemented to listen to particular events and handle them + An interface that should be implemented to listen to particular events and handle them. + See _SimpleVOEventListener for an example implementation. """ def __init__(self, broker_host='127.0.0.1', broker_port=8099, filter_for=None): """ @@ -83,16 +90,21 @@ class VOEventListenerInterface(object): """ raise NotImplementedError("Override this function with your custom behavior to react to this event!") - def start_listening(self): - listen_for_voevents(self.handle_event, self.broker_host, self.broker_port, self.filter_for) - + def start_listening(self, blocking=False): + args = (self.handle_event, self.broker_host, self.broker_port, self.filter_for) + if blocking: + listen_for_voevents(*args) + else: + t = threading.Thread(target=listen_for_voevents, args=args) + t.daemon = True + t.start() class _SimpleVOEventListener(VOEventListenerInterface): """ A simple stand-alone vo-event handler - Note: This is here for testing and demonstration purposes. - You should write your own handler function that you pass to listen_for_voevents(). + Note: This is here for manual testing and demonstration purposes. + You should usually write your own listener class by extending VOEventListenerInterface. """ def __init__(self, write_to_file=False, file_path=None, *args, **kwargs): diff --git a/SAS/TriggerServices/test/t_voevent_listener.py b/SAS/TriggerServices/test/t_voevent_listener.py index 229ae51ac6e9d0223f5e67d3f4901a4719f5d82e..ebeea09f65a2d5ba17b2380b708b98615d4ded4c 100644 --- a/SAS/TriggerServices/test/t_voevent_listener.py +++ b/SAS/TriggerServices/test/t_voevent_listener.py @@ -20,6 +20,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + import unittest import os import shutil @@ -58,7 +59,7 @@ class TestVOEventListener(unittest.TestCase): print('Cleaning up...') os.remove(tmppath) shutil.rmtree(DB_PATH) - subprocess.call('pkill -f "twistd comet"', shell=True) + subprocess.call('pkill -f "twistd comet"', shell=True) # todo: make this more specific? def test_received_voevent_is_identical_to_sent_voevent(self):