From 222b5d0b5317de559b00bcd0ea033d956fd9e727 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Fri, 18 Jan 2019 14:43:51 +0000 Subject: [PATCH] SW-567: added and used TaskInfoCache for fast lookup of info needed to accept/reject tbb alert voevents. The TaskInfoCache preloads all current information upon start, and then resfreshes itself on task status events, so it's always up to date. --- .gitattributes | 1 + SAS/TriggerServices/CMakeLists.txt | 2 +- SAS/TriggerServices/lib/CMakeLists.txt | 1 + SAS/TriggerServices/lib/config.py | 4 + SAS/TriggerServices/lib/task_info_cache.py | 241 ++++++++++++++++++++ SAS/TriggerServices/lib/trigger_service.py | 172 +++++++------- SAS/TriggerServices/lib/voevent_listener.py | 22 +- 7 files changed, 352 insertions(+), 91 deletions(-) create mode 100644 SAS/TriggerServices/lib/task_info_cache.py diff --git a/.gitattributes b/.gitattributes index 332bf479088..e23eedc2712 100644 --- a/.gitattributes +++ b/.gitattributes @@ -5289,6 +5289,7 @@ SAS/TriggerServices/doc/trigger_services.md -text SAS/TriggerServices/lib/CMakeLists.txt -text SAS/TriggerServices/lib/__init__.py -text SAS/TriggerServices/lib/config.py -text +SAS/TriggerServices/lib/task_info_cache.py -text SAS/TriggerServices/lib/trigger_cancellation_service.py -text SAS/TriggerServices/lib/trigger_service.py -text SAS/TriggerServices/lib/trigger_service_rpc.py -text diff --git a/SAS/TriggerServices/CMakeLists.txt b/SAS/TriggerServices/CMakeLists.txt index bc88c6986a3..f263e4dd89c 100644 --- a/SAS/TriggerServices/CMakeLists.txt +++ b/SAS/TriggerServices/CMakeLists.txt @@ -1,6 +1,6 @@ lofar_find_package(Python 2.7 REQUIRED) -lofar_package(TriggerServices 0.1 DEPENDS PyMessaging MoMQueryService SpecificationServices OTDB_Services) +lofar_package(TriggerServices 0.1 DEPENDS PyMessaging MoMQueryService SpecificationServices OTDB_Services ResourceAssignmentService TBB TBBClient) include(PythonInstall) diff --git a/SAS/TriggerServices/lib/CMakeLists.txt b/SAS/TriggerServices/lib/CMakeLists.txt index be9caf7637a..3ea84461ee7 100644 --- a/SAS/TriggerServices/lib/CMakeLists.txt +++ b/SAS/TriggerServices/lib/CMakeLists.txt @@ -9,6 +9,7 @@ set(_py_files trigger_cancellation_service.py voevent_listener.py voevent_decider.py + task_info_cache.py config.py ) diff --git a/SAS/TriggerServices/lib/config.py b/SAS/TriggerServices/lib/config.py index cdee7d04940..c5894750c93 100644 --- a/SAS/TriggerServices/lib/config.py +++ b/SAS/TriggerServices/lib/config.py @@ -38,5 +38,9 @@ ALERT_PACKET_TYPE_FILTER = None # list of int or None for all DEFAULT_TBB_CEP_NODES = None # list of nodes to dump to, e.g. ["cpu%s" % str(num).zfill(2) for num in expand_list("01-50")], or None for all available DEFAULT_TBB_SUBBANDS = "10-496" # The subbands to dump. Note: When starting the recording (tbbservice_start_recording), the subband range HAS to cover 487 subbands (typically 10-496) DEFAULT_TBB_STATIONS = ['cs001','cs002','cs003','cs004','cs005','cs006','cs007','cs011','cs013','cs017','cs021','cs024','cs026','cs028','cs030','cs031','cs032','cs101','cs103','cs201','cs301','cs302','cs401','cs501','rs106','rs205','rs208','rs210','rs305','rs306','rs307','rs310','rs406','rs407','rs409','rs503','rs508','rs509'] # List of stations to include in tbb dump (filtered for those who are actually observing at event ToA), or None for all observing. + +DEFAULT_TBB_PROJECT = "COM_ALERT" +DEFAULT_TBB_ALERT_MODE = "subband" + DEFAULT_CONSTANT_TIME_BETWEEN_SUBBAND_DUMPS = 0 # constant time in seconds to wait between subband dumps DEFAULT_DURATION_FACTOR_BETWEEN_SUBBAND_DUMPS = 0.00012 # linear scaling factor to increase time between dumps with duration \ No newline at end of file diff --git a/SAS/TriggerServices/lib/task_info_cache.py b/SAS/TriggerServices/lib/task_info_cache.py new file mode 100644 index 00000000000..1d2cfadaaf9 --- /dev/null +++ b/SAS/TriggerServices/lib/task_info_cache.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python + +# Copyright (C) 2015-2017 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id$ + +""" +task_info_cache is a module which provides the TaskInfoCache class which caches the info for the current active tasks (observation/pipeline)""" + +from lofar.sas.otdb.OTDBBusListener import OTDBBusListener +from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT + +from lofar.sas.otdb.otdbrpc import OTDBRPC +from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME + +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME + +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME + +from lofar.common.lcu_utils import get_current_stations +from pprint import pformat +from datetime import datetime, timedelta + +import logging +logger = logging.getLogger(__name__) + +class TaskInfo(object): + def __init__(self, parset, mom_task, mom_project, radb_task): + self.parset = parset + self.mom_task = mom_task + self.mom_project = mom_project + self.radb_task = radb_task + + def __str__(self): + return pformat(self.parset) + '\n' + \ + pformat(self.mom_task) + '\n' + \ + pformat(self.mom_project) + '\n' + \ + pformat(self.radb_task) + +class TaskInfoCache(OTDBBusListener): + """ + """ + + def __init__(self, + otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, + otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, + otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, + otdb_servicename=DEFAULT_OTDB_SERVICENAME, + radb_busname=RADB_BUSNAME, + radb_servicename=RADB_SERVICENAME, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, + mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, + broker=None, + radb_dbcreds=None): + """ + Creates a TaskInfoCache instance, which listens for OTDB task status events, and then fetches and caches relevant info for the current active task(s). + + :param otdb_notification_busname: + :param otdb_notification_subject: + :param otdb_busname: + :param otdb_servicename: + :param mom_busname: + :param mom_servicename: + :param broker: + :param radb_dbcreds: + """ + + # init the OTDBBusListener + super(TaskInfoCache, self).__init__(busname=otdb_notification_busname, subject=otdb_notification_subject)#, broker=broker) + + # the internal cache is a dict with a mapping of otdb_id->TaskInfo + self._cache = {} + + # the internal project cache is a dict with a mapping of project_name->project_info_dict + self._project_cache = {} + + # the internal stations cache is a list of the currently used stations + self._stations_cache = [] + + # internal rpc's to fetch the needed information + self._otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) + self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180) + self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180) + + def get_cached_tasks_otdb_ids(self): + return self._cache.keys() + + def get_active_tasks(self, active_at, task_type=None): + ''' + get a list of tasks which are active at the given timestamp (t.start <= active_at <= t.end) + :param active_at: datetime + :param task_type: string like 'observation' or 'pipeline' to filter by task type. No filtering is applied when task_type=None. + :return: list of active TaskInfo's + ''' + tasks = [ti for ti in self._cache.values() + if ti.radb_task['starttime'] <= active_at and ti.radb_task['endtime'] >= active_at] + + if task_type is not None: + tasks = [ti for ti in tasks + if ti.radb_task['task_type'] == task_type] + + return tasks + + def get_task_info(self, otdb_id): + return self._cache[int(otdb_id)] + + def get_project_info(self, project_name): + return self._project_cache[project_name] + + def get_project_names(self): + return sorted(self._project_cache.keys()) + + def get_stations(self): + return self._stations_cache + + def start_listening(self, numthreads=None): + logger.info("TaskInfoCache starting to listening for upcoming tasks...") + self._otdbrpc.open() + self._momrpc.open() + self._radbrpc.open() + super(TaskInfoCache, self).start_listening() + + # make sure we start with a filled projects/stations cache + self._update_projects_cache() + self._update_stations_cache() + self._update_active_tasks_cache() + logger.info("TaskInfoCache is ready for use, listening for upcoming tasks, and preloaded with projects, stations and active tasks.") + + def stop_listening(self): + self._otdbrpc.close() + self._momrpc.close() + self._radbrpc.close() + super(TaskInfoCache, self).stop_listening() + logger.info("TaskInfoCache stopped listening for upcoming tasks.") + + def onObservationScheduled(self, otdb_id, modificationTime): + """ overrides OTDBBusListener.onObservationQueued and calls self._add_task_to_cache """ + return self._update_semi_static_cache_and_add_task_to_cache(otdb_id) + + def onObservationQueued(self, otdb_id, modificationTime): + """ overrides OTDBBusListener.onObservationQueued and calls self._add_task_to_cache """ + # update internal project/station cache (could have been updated by a user in the meantime) + return self._update_semi_static_cache_and_add_task_to_cache(otdb_id) + + def onObservationStarted(self, otdb_id, modificationTime): + """ overrides OTDBBusListener.onObservationStarted and calls self._add_task_to_cache """ + return self._update_semi_static_cache_and_add_task_to_cache(otdb_id) + + def onObservationFinished(self, otdb_id, modificationTime): + """ overrides OTDBBusListener.onObservationFinished and calls self._remove_task_from_cache """ + return self._remove_task_from_cache(otdb_id) + + def onObservationAborted(self, otdb_id, modificationTime): + """ overrides OTDBBusListener.onObservationAborted and calls self._remove_task_from_cache """ + return self._remove_task_from_cache(otdb_id) + + def _update_semi_static_cache_and_add_task_to_cache(self, otdb_id): + self._update_stations_cache() + return self._add_task_to_cache(otdb_id) + + def _update_semi_static_cache(self): + # update internal project/station cache (could have been updated by a user in the meantime) + self._update_projects_cache() + self._update_stations_cache() + + def _update_projects_cache(self): + # update internal project cache (could have been updated by a user in the meantime) + self._project_cache = {str(p['name']):p for p in self._momrpc.getProjects()} + logger.info("TaskInfoCache: updated projects cache.") + + def _update_stations_cache(self): + # update internal stations cache (could have been updated by a user in the meantime) + self._stations_cache = get_current_stations('today_nl', as_host_names=False) + logger.info("TaskInfoCache: updated stations cache.") + + def _update_active_tasks_cache(self): + now = datetime.utcnow() + tasks = self._radbrpc.getTasks(lower_bound=now - timedelta(hours=6), + upper_bound=now + timedelta(hours=12), + task_status=['scheduled', 'queued', 'active', 'completing']) + + tasks_with_mom_id = [t for t in tasks if t.get('mom_id') is not None] + task_otdb_ids = [t['otdb_id'] for t in tasks_with_mom_id] + + logger.info("TaskInfoCache: adding %s active tasks to cache: %s", len(task_otdb_ids), ', '.join(str(id) for id in task_otdb_ids)) + + for otdb_id in task_otdb_ids: + self._add_task_to_cache(otdb_id) + logger.info("TaskInfoCache: updated active tasks cache.") + + def _add_task_to_cache(self, otdb_id): + logger.info("adding info for otdb_id=%s to cache", otdb_id) + + # fetch individual data for task from various rpc's + radb_task = self._radbrpc.getTask(otdb_id=otdb_id) + if radb_task.get('mom_id') is None: + logger.warning("skipping adding cache info for otdb_id=%s because it's mom_id is None.", otdb_id) + return + + parset = self._otdbrpc.taskGetSpecification(otdb_id=otdb_id)["specification"] + mom_task_info = self._momrpc.getObjectDetails(radb_task['mom_id'])[radb_task['mom_id']] + + # fetch the task's project info from the updated project cache + project_info = self.get_project_info(mom_task_info['project_name']) + self._cache[otdb_id] = TaskInfo(parset, mom_task_info, project_info, radb_task) + + logger.info("cache info for otdb_id=%s: %s", otdb_id, self._cache[otdb_id]) + + def _remove_task_from_cache(self, otdb_id): + logger.info("removing info for otdb_id=%s to cache") + if otdb_id in self._cache: + del self._cache[otdb_id] + +if __name__ == '__main__': + """Example usage""" + from lofar.common.util import waitForInterrupt + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # start listening on all default messaging buses, + # and let the TaskInfoCache instance log the events as they come along. + with TaskInfoCache() as cache: + waitForInterrupt() \ No newline at end of file diff --git a/SAS/TriggerServices/lib/trigger_service.py b/SAS/TriggerServices/lib/trigger_service.py index d08ed384911..22a356e7b00 100644 --- a/SAS/TriggerServices/lib/trigger_service.py +++ b/SAS/TriggerServices/lib/trigger_service.py @@ -23,25 +23,20 @@ from StringIO import StringIO from lxml import etree +from datetime import datetime from lofar.messaging import Service, EventMessage, ToBus from lofar.messaging.Service import MessageHandlerInterface from lofar.common.util import waitForInterrupt +from lofar.common.lcu_utils import stationname2hostname from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.specificationservices.specification_service_rpc import SpecificationRPC from lofar.specificationservices.validation_service_rpc import ValidationRPC from lofar.specificationservices.translation_service_rpc import TranslationRPC -from config import MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME, \ - SPECIFICATION_BUSNAME, SPECIFICATION_SERVICENAME, \ - VALIDATION_BUSNAME, VALIDATION_SERVICENAME, \ - SPECIFICATIONTRANSLATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME, \ - TRIGGER_SERVICENAME, TRIGGER_BUSNAME, \ - TRIGGER_ADDITION_NOTIFICATION_BUSNAME, TRIGGER_ADDITION_NOTIFICATION_SUBJECT, \ - ALERT_BROKER_HOST, ALERT_BROKER_PORT, ALERT_PACKET_TYPE_FILTER, \ - DEFAULT_TBB_CEP_NODES, DEFAULT_TBB_SUBBANDS, DEFAULT_TBB_STATIONS, \ - DEFAULT_CONSTANT_TIME_BETWEEN_SUBBAND_DUMPS, DEFAULT_DURATION_FACTOR_BETWEEN_SUBBAND_DUMPS +from .task_info_cache import TaskInfoCache +from .config import * from lofar.triggerservices.voevent_listener import VOEventListenerInterface @@ -208,65 +203,10 @@ def _get_current_tbbtype(): # This is probably a must-have for the initial implementation, so not making any assumptions here. # I think Sander wants to be able to poll for this, so we should add a function to the tbb service, and then call # that service from here, I guess. + # 20190111 JS: add the polling to the TaskInfoCache for fast lookup return 'subband' - #raise NotImplementedError -def _get_stations_recording_at(timestamp): - """ - :param timestamp: date to check as isostring - :return: stations as list of strings - """ - # todo! - # No idea how to do that without actually asking anyone? - - stations = ['cs001'] - logger.warning("The check for a stations in use at the given time is not yet implemented. Assuming these: %s" % stations) - # I interpret Sanders comment the way that the initial decision can be solely based on correct TBB mode, so I guess this is fine. - return stations - - -def _tbb_trigger_is_acceptable(project, stoptime, tbbtype, stations): - """ - Perform permission and system checks to determine whether we can actually perform a triggered TBB dump for the given project. - - :param project: project code as string - :param stoptime: isostring with freeze date - :param tbbtype: string with current tbb operation mode - :return: True if acceptable, else False - """ - if True: #_auth_allows_triggers(project): - logger.info('TBB Trigger is authorized') - if True: # _quota_allows_triggers(project): - logger.info('Trigger quota allows TBB dump') - if _get_current_tbbtype() == tbbtype: - logger.info('TBB operation mode is %s' % tbbtype) - if _observation_is_running_at(stoptime): - logger.info('Observation is running at time %s' % stoptime) - if all(station in _get_stations_recording_at(stoptime) for station in stations): - logger.info('All requested stations are part of a running observation') - return True - else: - msg = 'The requested stations %s are not observing at %s' % (stations, stoptime) - logger.error(msg) - raise Exception(msg) - else: - msg = 'Project %s is not allowed to trigger' % project - logger.error(msg) - raise Exception(msg) - else: - msg = "TBB operation mode is not '%s'." % tbbtype - logger.error(msg) - raise Exception(msg) - else: - msg = 'Trigger quota of project %s is exceeded' % project - logger.error(msg) - raise Exception(msg) - else: - msg = 'Project %s is not allowed to trigger' % project - logger.error(msg) - raise Exception(msg) - # todo: move this somewhere is can be commonly used def translate_arrival_time_to_frequency(reference_time, reference_frequency, dm, target_frequency=200): """ @@ -325,8 +265,8 @@ def do_tbb_subband_dump(starttime, duration, dm, project, triggerid, logger.warning("stoptime option is not implemented. Ignoring that one.") # todo: remove kwarg or make duration optional? # convert station list to list of lcus, as the tbb service requires it - lcus = [station + 'c' for station in stations] - station_str = ",".join(lcus) + lcus = [stationname2hostname(station) for station in stations] + lcus_str = ",".join(lcus) # convert float starttime to second and nanosecond component # todo: Do we need higher precision up to here? Investigate! @@ -347,7 +287,7 @@ def do_tbb_subband_dump(starttime, duration, dm, project, triggerid, # freezing data and starting upload logger.info("Freezing TBB data") - rpc.freeze_data(station_str, dm, sec, nsec) # This should block until all data is actually in the buffer + rpc.freeze_data(lcus_str, dm, sec, nsec) # This should block until all data is actually in the buffer logger.info("Starting TBB datawriters...") datawriter_nodes = rpc.start_datawriters(voevent_xml=voevent_xml) @@ -364,7 +304,7 @@ def do_tbb_subband_dump(starttime, duration, dm, project, triggerid, rpc.set_storage(storage_map) logger.info("Start uploading data to CEP") - rpc.upload_data(station_str, dm, starttime, duration, subbands, waittime, boards) + rpc.upload_data(lcus_str, dm, starttime, duration, subbands, waittime, boards) # restart recording logger.info("Restarting recording") @@ -375,9 +315,17 @@ class ALERTHandler(VOEventListenerInterface): """ This class implements the VOEventListenerInterface in order to receive VO events for the ALERT project. """ - project = "ALERT" # todo: set correct project code - tbbtype = "subband" - stations = DEFAULT_TBB_STATIONS + def __init__(self, broker_host='127.0.0.1', broker_port=8099, filter_for=None): + self._cache = TaskInfoCache(broker='scu001.control.lofar') + super(ALERTHandler, self).__init__(broker_host, broker_port, filter_for) + + def start_listening(self, blocking=False): + self._cache.start_listening() + super(ALERTHandler, self).start_listening(blocking=blocking) + + def stop_listening(self): + self._cache.stop_listening() + super(ALERTHandler, self).stop_listening() def handle_event(self, voevent_xml, voevent_etree): @@ -399,13 +347,9 @@ class ALERTHandler(VOEventListenerInterface): centertime = translate_arrival_time_to_frequency(reference_time, reference_frequency, dm, target_frequency=200) duration = 5.0 # tbb can hold this many seconds starttime = centertime - duration / 2.0 - stoptime = centertime - duration / 2.0 - - # The dump should be triggered on all stations that are both in _get_stations_recording_at(time) and in the list in the config file. The acceptance check later is then a little superfluous, actually... - stations_in_use = _get_stations_recording_at(stoptime) - self.stations = [station for station in self.stations if station in stations_in_use] + stoptime = centertime + duration / 2.0 - if _tbb_trigger_is_acceptable(self.project, stoptime, self.tbbtype, self.stations): + if self._tbb_trigger_is_acceptable(stoptime): logger.info('System pre-flight checks passed') # check if trigger is acceptable for PI @@ -413,9 +357,8 @@ class ALERTHandler(VOEventListenerInterface): if decider.is_acceptable(voevent_etree): logger.info('Science pre-flight checks passed') # _send_notification('ALERT Broker', ALERT_BROKER_HOST, self.project, triggerid, voevent_xml) # todo: do we want that? do we want it on same bus? - logger.info('Initiating TBB dump for trigger id %s, starttime %s, duration %s, dm %s' % (triggerid, starttime, duration, dm)) - do_tbb_subband_dump(starttime, duration, dm, self.project, triggerid, - stoptime=stoptime, stations=self.stations, voevent_xml=voevent_xml) + logger.info('Initiating TBB dump for trigger id %s, starttime %s, duration %ssec, dm %s' % (triggerid, starttime, duration, dm)) + do_tbb_subband_dump(starttime, duration, dm, DEFAULT_TBB_PROJECT, triggerid, stoptime=stoptime, stations=self._cache.get_stations()) else: raise Exception('Science pre-flight checks rejected the trigger!') else: @@ -427,6 +370,63 @@ class ALERTHandler(VOEventListenerInterface): logger.info('...done handling ALERT event.') + def _tbb_trigger_is_acceptable(self, stoptime): + """ + Perform permission and system checks to determine whether we can actually perform a triggered TBB dump for the given project. + + :param stoptime: isostring with freeze date + :return: True if acceptable, else False + """ + try: + project_info = self._cache.get_project_info(DEFAULT_TBB_PROJECT) + except KeyError: + logger.warning("Unknown project '%s'. TBB Trigger is not authorized.", DEFAULT_TBB_PROJECT) + + # Is the project allowed to trigger? + if project_info['allow_triggers']: + logger.info('TBB Trigger is authorized for project %s', DEFAULT_TBB_PROJECT) + else: + logger.warning('TBB Trigger is not authorized for project %s', DEFAULT_TBB_PROJECT) + return False + + # Are we allowed another trigger from the project's quota? + # TODO: update num_used_triggers in mom when the TBB alert trigger is done + if project_info['num_used_triggers'] < project_info['num_allowed_triggers']: + logger.info('Trigger quota allows TBB freeze/dump for project %s: num_used_triggers=%s num_allowed_triggers=%s', + DEFAULT_TBB_PROJECT, project_info['num_used_triggers'], project_info['num_allowed_triggers']) + else: + logger.warning('Trigger quota exceeded for project %s: num_used_triggers=%s num_allowed_triggers=%s', + DEFAULT_TBB_PROJECT, project_info['num_used_triggers'], project_info['num_allowed_triggers']) + return False + + # Correct TBB mode? + if _get_current_tbbtype() != DEFAULT_TBB_ALERT_MODE: # todo: move to do_tbb_subband_dump in order to switch firmware when needed? + logger.warning('TBB is currently in mode %s. Needed is mode %s.', _get_current_tbbtype(), DEFAULT_TBB_ALERT_MODE) + return False + else: + logger.info('TBB is in correct operational mode: %s' % DEFAULT_TBB_ALERT_MODE) + + # Any running observations? + active_tasks = self._cache.get_active_tasks(stoptime, 'observation') + if active_tasks: + otdb_ids = sorted([t.radb_task['otdb_id'] for t in active_tasks]) + logger.info('Observation(s) %s is/are running at time %s', otdb_ids, stoptime) + else: + logger.warning('No observations running at %s, so TBB\'s are not recording', stoptime) + return False + + active_stations = self._cache.get_stations() + active_tbb_stations = set(DEFAULT_TBB_STATIONS).intersection(active_stations) + + if len(active_tbb_stations) > 0: + logger.info('Enough TBB stations available: %s', active_tbb_stations) + else: + logger.warning('No TBB stations available. requested=%s available=%s', DEFAULT_TBB_STATIONS, active_stations) + return False + + # all prerequisites are met. + return True + def create_service(servicename=TRIGGER_SERVICENAME, busname=TRIGGER_BUSNAME): return Service(servicename, @@ -437,11 +437,11 @@ def create_service(servicename=TRIGGER_SERVICENAME, busname=TRIGGER_BUSNAME): def main(): - with create_service(): # handle vo events - logger.info("%s %s %s" %(ALERT_BROKER_HOST, ALERT_BROKER_PORT, ALERT_PACKET_TYPE_FILTER)) - alert_handler = ALERTHandler(broker_host=ALERT_BROKER_HOST, broker_port=ALERT_BROKER_PORT, filter_for=ALERT_PACKET_TYPE_FILTER) - alert_handler.start_listening() - waitForInterrupt() + with ALERTHandler(broker_host=ALERT_BROKER_HOST, broker_port=ALERT_BROKER_PORT, filter_for=ALERT_PACKET_TYPE_FILTER) as alert_handler: + waitForInterrupt() +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + main() \ No newline at end of file diff --git a/SAS/TriggerServices/lib/voevent_listener.py b/SAS/TriggerServices/lib/voevent_listener.py index 0e90e0c97e7..a57f57d3324 100644 --- a/SAS/TriggerServices/lib/voevent_listener.py +++ b/SAS/TriggerServices/lib/voevent_listener.py @@ -75,9 +75,9 @@ class VOEventListenerInterface(object): :param broker_port: Port of the VO event broker :param filter_for: a list of integer values that define the Packet_Type of the VO events to accept """ - self.broker_host = broker_host - self.broker_port = broker_port - self.filter_for = filter_for + self._broker_host = broker_host + self._broker_port = broker_port + self._filter_for = filter_for def handle_event(self, xml_string, lxml_etree_root): """ @@ -90,8 +90,18 @@ class VOEventListenerInterface(object): """ raise NotImplementedError("Override this function with your custom behavior to react to this event!") + def __enter__(self): + '''entering 'with' context, starts listening''' + self.start_listening(blocking=False) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + '''exiting 'with' context, stops listening''' + self.stop_listening() + def start_listening(self, blocking=False): - args = (self.handle_event, self.broker_host, self.broker_port, self.filter_for) + logger.info('start listening for VOEvents on %s:%s with filter: %s', self._broker_host, self._broker_port, self._filter_for) + args = (self.handle_event, self._broker_host, self._broker_port, self._filter_for) if blocking: listen_for_voevents(*args) else: @@ -99,6 +109,10 @@ class VOEventListenerInterface(object): t.daemon = True t.start() + def stop_listening(self): + '''should stop listening, but we can't due to gcn's blocking listen_for_voevents method...''' + logger.warning('Cannot stop listening for VOEvents on %s:%s because the gcn package only offers a blocking listener...', + self._broker_host, self._broker_port) class _SimpleVOEventListener(VOEventListenerInterface): """ -- GitLab