Skip to content
Snippets Groups Projects
Commit 222b5d0b authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-567: added and used TaskInfoCache for fast lookup of info needed to...

SW-567: added and used TaskInfoCache for fast lookup of info needed to accept/reject tbb alert voevents. The TaskInfoCache preloads all current information upon start, and then resfreshes itself on task status events, so it's always up to date.
parent 30593360
No related branches found
No related tags found
No related merge requests found
......@@ -5289,6 +5289,7 @@ SAS/TriggerServices/doc/trigger_services.md -text
SAS/TriggerServices/lib/CMakeLists.txt -text
SAS/TriggerServices/lib/__init__.py -text
SAS/TriggerServices/lib/config.py -text
SAS/TriggerServices/lib/task_info_cache.py -text
SAS/TriggerServices/lib/trigger_cancellation_service.py -text
SAS/TriggerServices/lib/trigger_service.py -text
SAS/TriggerServices/lib/trigger_service_rpc.py -text
......
lofar_find_package(Python 2.7 REQUIRED)
lofar_package(TriggerServices 0.1 DEPENDS PyMessaging MoMQueryService SpecificationServices OTDB_Services)
lofar_package(TriggerServices 0.1 DEPENDS PyMessaging MoMQueryService SpecificationServices OTDB_Services ResourceAssignmentService TBB TBBClient)
include(PythonInstall)
......
......@@ -9,6 +9,7 @@ set(_py_files
trigger_cancellation_service.py
voevent_listener.py
voevent_decider.py
task_info_cache.py
config.py
)
......
......@@ -38,5 +38,9 @@ ALERT_PACKET_TYPE_FILTER = None # list of int or None for all
DEFAULT_TBB_CEP_NODES = None # list of nodes to dump to, e.g. ["cpu%s" % str(num).zfill(2) for num in expand_list("01-50")], or None for all available
DEFAULT_TBB_SUBBANDS = "10-496" # The subbands to dump. Note: When starting the recording (tbbservice_start_recording), the subband range HAS to cover 487 subbands (typically 10-496)
DEFAULT_TBB_STATIONS = ['cs001','cs002','cs003','cs004','cs005','cs006','cs007','cs011','cs013','cs017','cs021','cs024','cs026','cs028','cs030','cs031','cs032','cs101','cs103','cs201','cs301','cs302','cs401','cs501','rs106','rs205','rs208','rs210','rs305','rs306','rs307','rs310','rs406','rs407','rs409','rs503','rs508','rs509'] # List of stations to include in tbb dump (filtered for those who are actually observing at event ToA), or None for all observing.
DEFAULT_TBB_PROJECT = "COM_ALERT"
DEFAULT_TBB_ALERT_MODE = "subband"
DEFAULT_CONSTANT_TIME_BETWEEN_SUBBAND_DUMPS = 0 # constant time in seconds to wait between subband dumps
DEFAULT_DURATION_FACTOR_BETWEEN_SUBBAND_DUMPS = 0.00012 # linear scaling factor to increase time between dumps with duration
\ No newline at end of file
#!/usr/bin/env python
# Copyright (C) 2015-2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
"""
task_info_cache is a module which provides the TaskInfoCache class which caches the info for the current active tasks (observation/pipeline)"""
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.common.lcu_utils import get_current_stations
from pprint import pformat
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class TaskInfo(object):
def __init__(self, parset, mom_task, mom_project, radb_task):
self.parset = parset
self.mom_task = mom_task
self.mom_project = mom_project
self.radb_task = radb_task
def __str__(self):
return pformat(self.parset) + '\n' + \
pformat(self.mom_task) + '\n' + \
pformat(self.mom_project) + '\n' + \
pformat(self.radb_task)
class TaskInfoCache(OTDBBusListener):
"""
"""
def __init__(self,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None,
radb_dbcreds=None):
"""
Creates a TaskInfoCache instance, which listens for OTDB task status events, and then fetches and caches relevant info for the current active task(s).
:param otdb_notification_busname:
:param otdb_notification_subject:
:param otdb_busname:
:param otdb_servicename:
:param mom_busname:
:param mom_servicename:
:param broker:
:param radb_dbcreds:
"""
# init the OTDBBusListener
super(TaskInfoCache, self).__init__(busname=otdb_notification_busname, subject=otdb_notification_subject)#, broker=broker)
# the internal cache is a dict with a mapping of otdb_id->TaskInfo
self._cache = {}
# the internal project cache is a dict with a mapping of project_name->project_info_dict
self._project_cache = {}
# the internal stations cache is a list of the currently used stations
self._stations_cache = []
# internal rpc's to fetch the needed information
self._otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180)
self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180)
def get_cached_tasks_otdb_ids(self):
return self._cache.keys()
def get_active_tasks(self, active_at, task_type=None):
'''
get a list of tasks which are active at the given timestamp (t.start <= active_at <= t.end)
:param active_at: datetime
:param task_type: string like 'observation' or 'pipeline' to filter by task type. No filtering is applied when task_type=None.
:return: list of active TaskInfo's
'''
tasks = [ti for ti in self._cache.values()
if ti.radb_task['starttime'] <= active_at and ti.radb_task['endtime'] >= active_at]
if task_type is not None:
tasks = [ti for ti in tasks
if ti.radb_task['task_type'] == task_type]
return tasks
def get_task_info(self, otdb_id):
return self._cache[int(otdb_id)]
def get_project_info(self, project_name):
return self._project_cache[project_name]
def get_project_names(self):
return sorted(self._project_cache.keys())
def get_stations(self):
return self._stations_cache
def start_listening(self, numthreads=None):
logger.info("TaskInfoCache starting to listening for upcoming tasks...")
self._otdbrpc.open()
self._momrpc.open()
self._radbrpc.open()
super(TaskInfoCache, self).start_listening()
# make sure we start with a filled projects/stations cache
self._update_projects_cache()
self._update_stations_cache()
self._update_active_tasks_cache()
logger.info("TaskInfoCache is ready for use, listening for upcoming tasks, and preloaded with projects, stations and active tasks.")
def stop_listening(self):
self._otdbrpc.close()
self._momrpc.close()
self._radbrpc.close()
super(TaskInfoCache, self).stop_listening()
logger.info("TaskInfoCache stopped listening for upcoming tasks.")
def onObservationScheduled(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationQueued and calls self._add_task_to_cache """
return self._update_semi_static_cache_and_add_task_to_cache(otdb_id)
def onObservationQueued(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationQueued and calls self._add_task_to_cache """
# update internal project/station cache (could have been updated by a user in the meantime)
return self._update_semi_static_cache_and_add_task_to_cache(otdb_id)
def onObservationStarted(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationStarted and calls self._add_task_to_cache """
return self._update_semi_static_cache_and_add_task_to_cache(otdb_id)
def onObservationFinished(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationFinished and calls self._remove_task_from_cache """
return self._remove_task_from_cache(otdb_id)
def onObservationAborted(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationAborted and calls self._remove_task_from_cache """
return self._remove_task_from_cache(otdb_id)
def _update_semi_static_cache_and_add_task_to_cache(self, otdb_id):
self._update_stations_cache()
return self._add_task_to_cache(otdb_id)
def _update_semi_static_cache(self):
# update internal project/station cache (could have been updated by a user in the meantime)
self._update_projects_cache()
self._update_stations_cache()
def _update_projects_cache(self):
# update internal project cache (could have been updated by a user in the meantime)
self._project_cache = {str(p['name']):p for p in self._momrpc.getProjects()}
logger.info("TaskInfoCache: updated projects cache.")
def _update_stations_cache(self):
# update internal stations cache (could have been updated by a user in the meantime)
self._stations_cache = get_current_stations('today_nl', as_host_names=False)
logger.info("TaskInfoCache: updated stations cache.")
def _update_active_tasks_cache(self):
now = datetime.utcnow()
tasks = self._radbrpc.getTasks(lower_bound=now - timedelta(hours=6),
upper_bound=now + timedelta(hours=12),
task_status=['scheduled', 'queued', 'active', 'completing'])
tasks_with_mom_id = [t for t in tasks if t.get('mom_id') is not None]
task_otdb_ids = [t['otdb_id'] for t in tasks_with_mom_id]
logger.info("TaskInfoCache: adding %s active tasks to cache: %s", len(task_otdb_ids), ', '.join(str(id) for id in task_otdb_ids))
for otdb_id in task_otdb_ids:
self._add_task_to_cache(otdb_id)
logger.info("TaskInfoCache: updated active tasks cache.")
def _add_task_to_cache(self, otdb_id):
logger.info("adding info for otdb_id=%s to cache", otdb_id)
# fetch individual data for task from various rpc's
radb_task = self._radbrpc.getTask(otdb_id=otdb_id)
if radb_task.get('mom_id') is None:
logger.warning("skipping adding cache info for otdb_id=%s because it's mom_id is None.", otdb_id)
return
parset = self._otdbrpc.taskGetSpecification(otdb_id=otdb_id)["specification"]
mom_task_info = self._momrpc.getObjectDetails(radb_task['mom_id'])[radb_task['mom_id']]
# fetch the task's project info from the updated project cache
project_info = self.get_project_info(mom_task_info['project_name'])
self._cache[otdb_id] = TaskInfo(parset, mom_task_info, project_info, radb_task)
logger.info("cache info for otdb_id=%s: %s", otdb_id, self._cache[otdb_id])
def _remove_task_from_cache(self, otdb_id):
logger.info("removing info for otdb_id=%s to cache")
if otdb_id in self._cache:
del self._cache[otdb_id]
if __name__ == '__main__':
"""Example usage"""
from lofar.common.util import waitForInterrupt
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# start listening on all default messaging buses,
# and let the TaskInfoCache instance log the events as they come along.
with TaskInfoCache() as cache:
waitForInterrupt()
\ No newline at end of file
......@@ -23,25 +23,20 @@
from StringIO import StringIO
from lxml import etree
from datetime import datetime
from lofar.messaging import Service, EventMessage, ToBus
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt
from lofar.common.lcu_utils import stationname2hostname
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.specificationservices.specification_service_rpc import SpecificationRPC
from lofar.specificationservices.validation_service_rpc import ValidationRPC
from lofar.specificationservices.translation_service_rpc import TranslationRPC
from config import MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME, \
SPECIFICATION_BUSNAME, SPECIFICATION_SERVICENAME, \
VALIDATION_BUSNAME, VALIDATION_SERVICENAME, \
SPECIFICATIONTRANSLATION_BUSNAME, SPECIFICATIONTRANSLATION_SERVICENAME, \
TRIGGER_SERVICENAME, TRIGGER_BUSNAME, \
TRIGGER_ADDITION_NOTIFICATION_BUSNAME, TRIGGER_ADDITION_NOTIFICATION_SUBJECT, \
ALERT_BROKER_HOST, ALERT_BROKER_PORT, ALERT_PACKET_TYPE_FILTER, \
DEFAULT_TBB_CEP_NODES, DEFAULT_TBB_SUBBANDS, DEFAULT_TBB_STATIONS, \
DEFAULT_CONSTANT_TIME_BETWEEN_SUBBAND_DUMPS, DEFAULT_DURATION_FACTOR_BETWEEN_SUBBAND_DUMPS
from .task_info_cache import TaskInfoCache
from .config import *
from lofar.triggerservices.voevent_listener import VOEventListenerInterface
......@@ -208,65 +203,10 @@ def _get_current_tbbtype():
# This is probably a must-have for the initial implementation, so not making any assumptions here.
# I think Sander wants to be able to poll for this, so we should add a function to the tbb service, and then call
# that service from here, I guess.
# 20190111 JS: add the polling to the TaskInfoCache for fast lookup
return 'subband'
#raise NotImplementedError
def _get_stations_recording_at(timestamp):
"""
:param timestamp: date to check as isostring
:return: stations as list of strings
"""
# todo!
# No idea how to do that without actually asking anyone?
stations = ['cs001']
logger.warning("The check for a stations in use at the given time is not yet implemented. Assuming these: %s" % stations)
# I interpret Sanders comment the way that the initial decision can be solely based on correct TBB mode, so I guess this is fine.
return stations
def _tbb_trigger_is_acceptable(project, stoptime, tbbtype, stations):
"""
Perform permission and system checks to determine whether we can actually perform a triggered TBB dump for the given project.
:param project: project code as string
:param stoptime: isostring with freeze date
:param tbbtype: string with current tbb operation mode
:return: True if acceptable, else False
"""
if True: #_auth_allows_triggers(project):
logger.info('TBB Trigger is authorized')
if True: # _quota_allows_triggers(project):
logger.info('Trigger quota allows TBB dump')
if _get_current_tbbtype() == tbbtype:
logger.info('TBB operation mode is %s' % tbbtype)
if _observation_is_running_at(stoptime):
logger.info('Observation is running at time %s' % stoptime)
if all(station in _get_stations_recording_at(stoptime) for station in stations):
logger.info('All requested stations are part of a running observation')
return True
else:
msg = 'The requested stations %s are not observing at %s' % (stations, stoptime)
logger.error(msg)
raise Exception(msg)
else:
msg = 'Project %s is not allowed to trigger' % project
logger.error(msg)
raise Exception(msg)
else:
msg = "TBB operation mode is not '%s'." % tbbtype
logger.error(msg)
raise Exception(msg)
else:
msg = 'Trigger quota of project %s is exceeded' % project
logger.error(msg)
raise Exception(msg)
else:
msg = 'Project %s is not allowed to trigger' % project
logger.error(msg)
raise Exception(msg)
# todo: move this somewhere is can be commonly used
def translate_arrival_time_to_frequency(reference_time, reference_frequency, dm, target_frequency=200):
"""
......@@ -325,8 +265,8 @@ def do_tbb_subband_dump(starttime, duration, dm, project, triggerid,
logger.warning("stoptime option is not implemented. Ignoring that one.") # todo: remove kwarg or make duration optional?
# convert station list to list of lcus, as the tbb service requires it
lcus = [station + 'c' for station in stations]
station_str = ",".join(lcus)
lcus = [stationname2hostname(station) for station in stations]
lcus_str = ",".join(lcus)
# convert float starttime to second and nanosecond component
# todo: Do we need higher precision up to here? Investigate!
......@@ -347,7 +287,7 @@ def do_tbb_subband_dump(starttime, duration, dm, project, triggerid,
# freezing data and starting upload
logger.info("Freezing TBB data")
rpc.freeze_data(station_str, dm, sec, nsec) # This should block until all data is actually in the buffer
rpc.freeze_data(lcus_str, dm, sec, nsec) # This should block until all data is actually in the buffer
logger.info("Starting TBB datawriters...")
datawriter_nodes = rpc.start_datawriters(voevent_xml=voevent_xml)
......@@ -364,7 +304,7 @@ def do_tbb_subband_dump(starttime, duration, dm, project, triggerid,
rpc.set_storage(storage_map)
logger.info("Start uploading data to CEP")
rpc.upload_data(station_str, dm, starttime, duration, subbands, waittime, boards)
rpc.upload_data(lcus_str, dm, starttime, duration, subbands, waittime, boards)
# restart recording
logger.info("Restarting recording")
......@@ -375,9 +315,17 @@ class ALERTHandler(VOEventListenerInterface):
"""
This class implements the VOEventListenerInterface in order to receive VO events for the ALERT project.
"""
project = "ALERT" # todo: set correct project code
tbbtype = "subband"
stations = DEFAULT_TBB_STATIONS
def __init__(self, broker_host='127.0.0.1', broker_port=8099, filter_for=None):
self._cache = TaskInfoCache(broker='scu001.control.lofar')
super(ALERTHandler, self).__init__(broker_host, broker_port, filter_for)
def start_listening(self, blocking=False):
self._cache.start_listening()
super(ALERTHandler, self).start_listening(blocking=blocking)
def stop_listening(self):
self._cache.stop_listening()
super(ALERTHandler, self).stop_listening()
def handle_event(self, voevent_xml, voevent_etree):
......@@ -399,13 +347,9 @@ class ALERTHandler(VOEventListenerInterface):
centertime = translate_arrival_time_to_frequency(reference_time, reference_frequency, dm, target_frequency=200)
duration = 5.0 # tbb can hold this many seconds
starttime = centertime - duration / 2.0
stoptime = centertime - duration / 2.0
# The dump should be triggered on all stations that are both in _get_stations_recording_at(time) and in the list in the config file. The acceptance check later is then a little superfluous, actually...
stations_in_use = _get_stations_recording_at(stoptime)
self.stations = [station for station in self.stations if station in stations_in_use]
stoptime = centertime + duration / 2.0
if _tbb_trigger_is_acceptable(self.project, stoptime, self.tbbtype, self.stations):
if self._tbb_trigger_is_acceptable(stoptime):
logger.info('System pre-flight checks passed')
# check if trigger is acceptable for PI
......@@ -413,9 +357,8 @@ class ALERTHandler(VOEventListenerInterface):
if decider.is_acceptable(voevent_etree):
logger.info('Science pre-flight checks passed')
# _send_notification('ALERT Broker', ALERT_BROKER_HOST, self.project, triggerid, voevent_xml) # todo: do we want that? do we want it on same bus?
logger.info('Initiating TBB dump for trigger id %s, starttime %s, duration %s, dm %s' % (triggerid, starttime, duration, dm))
do_tbb_subband_dump(starttime, duration, dm, self.project, triggerid,
stoptime=stoptime, stations=self.stations, voevent_xml=voevent_xml)
logger.info('Initiating TBB dump for trigger id %s, starttime %s, duration %ssec, dm %s' % (triggerid, starttime, duration, dm))
do_tbb_subband_dump(starttime, duration, dm, DEFAULT_TBB_PROJECT, triggerid, stoptime=stoptime, stations=self._cache.get_stations())
else:
raise Exception('Science pre-flight checks rejected the trigger!')
else:
......@@ -427,6 +370,63 @@ class ALERTHandler(VOEventListenerInterface):
logger.info('...done handling ALERT event.')
def _tbb_trigger_is_acceptable(self, stoptime):
"""
Perform permission and system checks to determine whether we can actually perform a triggered TBB dump for the given project.
:param stoptime: isostring with freeze date
:return: True if acceptable, else False
"""
try:
project_info = self._cache.get_project_info(DEFAULT_TBB_PROJECT)
except KeyError:
logger.warning("Unknown project '%s'. TBB Trigger is not authorized.", DEFAULT_TBB_PROJECT)
# Is the project allowed to trigger?
if project_info['allow_triggers']:
logger.info('TBB Trigger is authorized for project %s', DEFAULT_TBB_PROJECT)
else:
logger.warning('TBB Trigger is not authorized for project %s', DEFAULT_TBB_PROJECT)
return False
# Are we allowed another trigger from the project's quota?
# TODO: update num_used_triggers in mom when the TBB alert trigger is done
if project_info['num_used_triggers'] < project_info['num_allowed_triggers']:
logger.info('Trigger quota allows TBB freeze/dump for project %s: num_used_triggers=%s num_allowed_triggers=%s',
DEFAULT_TBB_PROJECT, project_info['num_used_triggers'], project_info['num_allowed_triggers'])
else:
logger.warning('Trigger quota exceeded for project %s: num_used_triggers=%s num_allowed_triggers=%s',
DEFAULT_TBB_PROJECT, project_info['num_used_triggers'], project_info['num_allowed_triggers'])
return False
# Correct TBB mode?
if _get_current_tbbtype() != DEFAULT_TBB_ALERT_MODE: # todo: move to do_tbb_subband_dump in order to switch firmware when needed?
logger.warning('TBB is currently in mode %s. Needed is mode %s.', _get_current_tbbtype(), DEFAULT_TBB_ALERT_MODE)
return False
else:
logger.info('TBB is in correct operational mode: %s' % DEFAULT_TBB_ALERT_MODE)
# Any running observations?
active_tasks = self._cache.get_active_tasks(stoptime, 'observation')
if active_tasks:
otdb_ids = sorted([t.radb_task['otdb_id'] for t in active_tasks])
logger.info('Observation(s) %s is/are running at time %s', otdb_ids, stoptime)
else:
logger.warning('No observations running at %s, so TBB\'s are not recording', stoptime)
return False
active_stations = self._cache.get_stations()
active_tbb_stations = set(DEFAULT_TBB_STATIONS).intersection(active_stations)
if len(active_tbb_stations) > 0:
logger.info('Enough TBB stations available: %s', active_tbb_stations)
else:
logger.warning('No TBB stations available. requested=%s available=%s', DEFAULT_TBB_STATIONS, active_stations)
return False
# all prerequisites are met.
return True
def create_service(servicename=TRIGGER_SERVICENAME, busname=TRIGGER_BUSNAME):
return Service(servicename,
......@@ -437,11 +437,11 @@ def create_service(servicename=TRIGGER_SERVICENAME, busname=TRIGGER_BUSNAME):
def main():
with create_service():
# handle vo events
logger.info("%s %s %s" %(ALERT_BROKER_HOST, ALERT_BROKER_PORT, ALERT_PACKET_TYPE_FILTER))
alert_handler = ALERTHandler(broker_host=ALERT_BROKER_HOST, broker_port=ALERT_BROKER_PORT, filter_for=ALERT_PACKET_TYPE_FILTER)
alert_handler.start_listening()
waitForInterrupt()
with ALERTHandler(broker_host=ALERT_BROKER_HOST, broker_port=ALERT_BROKER_PORT, filter_for=ALERT_PACKET_TYPE_FILTER) as alert_handler:
waitForInterrupt()
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
main()
\ No newline at end of file
......@@ -75,9 +75,9 @@ class VOEventListenerInterface(object):
:param broker_port: Port of the VO event broker
:param filter_for: a list of integer values that define the Packet_Type of the VO events to accept
"""
self.broker_host = broker_host
self.broker_port = broker_port
self.filter_for = filter_for
self._broker_host = broker_host
self._broker_port = broker_port
self._filter_for = filter_for
def handle_event(self, xml_string, lxml_etree_root):
"""
......@@ -90,8 +90,18 @@ class VOEventListenerInterface(object):
"""
raise NotImplementedError("Override this function with your custom behavior to react to this event!")
def __enter__(self):
'''entering 'with' context, starts listening'''
self.start_listening(blocking=False)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
'''exiting 'with' context, stops listening'''
self.stop_listening()
def start_listening(self, blocking=False):
args = (self.handle_event, self.broker_host, self.broker_port, self.filter_for)
logger.info('start listening for VOEvents on %s:%s with filter: %s', self._broker_host, self._broker_port, self._filter_for)
args = (self.handle_event, self._broker_host, self._broker_port, self._filter_for)
if blocking:
listen_for_voevents(*args)
else:
......@@ -99,6 +109,10 @@ class VOEventListenerInterface(object):
t.daemon = True
t.start()
def stop_listening(self):
'''should stop listening, but we can't due to gcn's blocking listen_for_voevents method...'''
logger.warning('Cannot stop listening for VOEvents on %s:%s because the gcn package only offers a blocking listener...',
self._broker_host, self._broker_port)
class _SimpleVOEventListener(VOEventListenerInterface):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment