-
Jorrit Schaap authored
SW-567: added and used TaskInfoCache for fast lookup of info needed to accept/reject tbb alert voevents. The TaskInfoCache preloads all current information upon start, and then resfreshes itself on task status events, so it's always up to date.
Jorrit Schaap authoredSW-567: added and used TaskInfoCache for fast lookup of info needed to accept/reject tbb alert voevents. The TaskInfoCache preloads all current information upon start, and then resfreshes itself on task status events, so it's always up to date.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
task_info_cache.py 10.52 KiB
#!/usr/bin/env python
# Copyright (C) 2015-2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
"""
task_info_cache is a module which provides the TaskInfoCache class which caches the info for the current active tasks (observation/pipeline)"""
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.common.lcu_utils import get_current_stations
from pprint import pformat
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class TaskInfo(object):
def __init__(self, parset, mom_task, mom_project, radb_task):
self.parset = parset
self.mom_task = mom_task
self.mom_project = mom_project
self.radb_task = radb_task
def __str__(self):
return pformat(self.parset) + '\n' + \
pformat(self.mom_task) + '\n' + \
pformat(self.mom_project) + '\n' + \
pformat(self.radb_task)
class TaskInfoCache(OTDBBusListener):
"""
"""
def __init__(self,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None,
radb_dbcreds=None):
"""
Creates a TaskInfoCache instance, which listens for OTDB task status events, and then fetches and caches relevant info for the current active task(s).
:param otdb_notification_busname:
:param otdb_notification_subject:
:param otdb_busname:
:param otdb_servicename:
:param mom_busname:
:param mom_servicename:
:param broker:
:param radb_dbcreds:
"""
# init the OTDBBusListener
super(TaskInfoCache, self).__init__(busname=otdb_notification_busname, subject=otdb_notification_subject)#, broker=broker)
# the internal cache is a dict with a mapping of otdb_id->TaskInfo
self._cache = {}
# the internal project cache is a dict with a mapping of project_name->project_info_dict
self._project_cache = {}
# the internal stations cache is a list of the currently used stations
self._stations_cache = []
# internal rpc's to fetch the needed information
self._otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180)
self._momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
self._radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180)
def get_cached_tasks_otdb_ids(self):
return self._cache.keys()
def get_active_tasks(self, active_at, task_type=None):
'''
get a list of tasks which are active at the given timestamp (t.start <= active_at <= t.end)
:param active_at: datetime
:param task_type: string like 'observation' or 'pipeline' to filter by task type. No filtering is applied when task_type=None.
:return: list of active TaskInfo's
'''
tasks = [ti for ti in self._cache.values()
if ti.radb_task['starttime'] <= active_at and ti.radb_task['endtime'] >= active_at]
if task_type is not None:
tasks = [ti for ti in tasks
if ti.radb_task['task_type'] == task_type]
return tasks
def get_task_info(self, otdb_id):
return self._cache[int(otdb_id)]
def get_project_info(self, project_name):
return self._project_cache[project_name]
def get_project_names(self):
return sorted(self._project_cache.keys())
def get_stations(self):
return self._stations_cache
def start_listening(self, numthreads=None):
logger.info("TaskInfoCache starting to listening for upcoming tasks...")
self._otdbrpc.open()
self._momrpc.open()
self._radbrpc.open()
super(TaskInfoCache, self).start_listening()
# make sure we start with a filled projects/stations cache
self._update_projects_cache()
self._update_stations_cache()
self._update_active_tasks_cache()
logger.info("TaskInfoCache is ready for use, listening for upcoming tasks, and preloaded with projects, stations and active tasks.")
def stop_listening(self):
self._otdbrpc.close()
self._momrpc.close()
self._radbrpc.close()
super(TaskInfoCache, self).stop_listening()
logger.info("TaskInfoCache stopped listening for upcoming tasks.")
def onObservationScheduled(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationQueued and calls self._add_task_to_cache """
return self._update_semi_static_cache_and_add_task_to_cache(otdb_id)
def onObservationQueued(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationQueued and calls self._add_task_to_cache """
# update internal project/station cache (could have been updated by a user in the meantime)
return self._update_semi_static_cache_and_add_task_to_cache(otdb_id)
def onObservationStarted(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationStarted and calls self._add_task_to_cache """
return self._update_semi_static_cache_and_add_task_to_cache(otdb_id)
def onObservationFinished(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationFinished and calls self._remove_task_from_cache """
return self._remove_task_from_cache(otdb_id)
def onObservationAborted(self, otdb_id, modificationTime):
""" overrides OTDBBusListener.onObservationAborted and calls self._remove_task_from_cache """
return self._remove_task_from_cache(otdb_id)
def _update_semi_static_cache_and_add_task_to_cache(self, otdb_id):
self._update_stations_cache()
return self._add_task_to_cache(otdb_id)
def _update_semi_static_cache(self):
# update internal project/station cache (could have been updated by a user in the meantime)
self._update_projects_cache()
self._update_stations_cache()
def _update_projects_cache(self):
# update internal project cache (could have been updated by a user in the meantime)
self._project_cache = {str(p['name']):p for p in self._momrpc.getProjects()}
logger.info("TaskInfoCache: updated projects cache.")
def _update_stations_cache(self):
# update internal stations cache (could have been updated by a user in the meantime)
self._stations_cache = get_current_stations('today_nl', as_host_names=False)
logger.info("TaskInfoCache: updated stations cache.")
def _update_active_tasks_cache(self):
now = datetime.utcnow()
tasks = self._radbrpc.getTasks(lower_bound=now - timedelta(hours=6),
upper_bound=now + timedelta(hours=12),
task_status=['scheduled', 'queued', 'active', 'completing'])
tasks_with_mom_id = [t for t in tasks if t.get('mom_id') is not None]
task_otdb_ids = [t['otdb_id'] for t in tasks_with_mom_id]
logger.info("TaskInfoCache: adding %s active tasks to cache: %s", len(task_otdb_ids), ', '.join(str(id) for id in task_otdb_ids))
for otdb_id in task_otdb_ids:
self._add_task_to_cache(otdb_id)
logger.info("TaskInfoCache: updated active tasks cache.")
def _add_task_to_cache(self, otdb_id):
logger.info("adding info for otdb_id=%s to cache", otdb_id)
# fetch individual data for task from various rpc's
radb_task = self._radbrpc.getTask(otdb_id=otdb_id)
if radb_task.get('mom_id') is None:
logger.warning("skipping adding cache info for otdb_id=%s because it's mom_id is None.", otdb_id)
return
parset = self._otdbrpc.taskGetSpecification(otdb_id=otdb_id)["specification"]
mom_task_info = self._momrpc.getObjectDetails(radb_task['mom_id'])[radb_task['mom_id']]
# fetch the task's project info from the updated project cache
project_info = self.get_project_info(mom_task_info['project_name'])
self._cache[otdb_id] = TaskInfo(parset, mom_task_info, project_info, radb_task)
logger.info("cache info for otdb_id=%s: %s", otdb_id, self._cache[otdb_id])
def _remove_task_from_cache(self, otdb_id):
logger.info("removing info for otdb_id=%s to cache")
if otdb_id in self._cache:
del self._cache[otdb_id]
if __name__ == '__main__':
"""Example usage"""
from lofar.common.util import waitForInterrupt
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# start listening on all default messaging buses,
# and let the TaskInfoCache instance log the events as they come along.
with TaskInfoCache() as cache:
waitForInterrupt()