Skip to content
Snippets Groups Projects
Commit 9a8775e1 authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #10557: added TaskPrescheduler to put task with trigger on prescheduled

parent 8a7d8013
No related branches found
No related tags found
No related merge requests found
Showing
with 1479 additions and 1 deletion
......@@ -5154,6 +5154,15 @@ SAS/ResourceAssignment/SystemStatusService/test/CMakeLists.txt -text
SAS/ResourceAssignment/SystemStatusService/test/test_datamonitorqueueservice_and_rpc.py -text
SAS/ResourceAssignment/SystemStatusService/test/test_datamonitorqueueservice_and_rpc.run -text
SAS/ResourceAssignment/SystemStatusService/test/test_datamonitorqueueservice_and_rpc.sh -text
SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt -text
SAS/ResourceAssignment/TaskPrescheduler/__init__.py -text
SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py -text
SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler -text
SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.ini -text
SAS/ResourceAssignment/TaskPrescheduler/test/CMakeLists.txt -text
SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py -text
SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.run -text
SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.sh -text
SAS/Scheduler/src/.default_settings.set -text
SAS/Scheduler/src/LOFAR_libScheduler.pro -text
SAS/Scheduler/src/conflictdialog.ui -text
......
......@@ -11,5 +11,5 @@ lofar_add_package(SystemStatusDatabase)
lofar_add_package(SystemStatusService)
lofar_add_package(OTDBtoRATaskStatusPropagator)
lofar_add_package(RAScripts)
lofar_add_package(TaskPrescheduler)
......@@ -275,6 +275,7 @@ class RATaskSpecified(OTDBBusListener):
key = PARSET_PREFIX + "Observation.processSubtype"
result['task_type'], result['task_subtype'] = convertSchedulerProcessSubtype(parset.get(key, ""))
#TODO probably we only need to do this on state=prescheduled, but then we need a different name for the function?
logger.info("Processing predecessors")
predecessor_ids = self.get_predecessors(parset)
for id in predecessor_ids:
......
# $Id$
lofar_package(TaskPrescheduler 1.0 DEPENDS PyMessaging ResourceAssignmentService OTDB_Services)
lofar_find_package(Python 2.6 REQUIRED)
include(PythonInstall)
set(_py_files
__init__.py
prescheduler.py
)
python_install(${_py_files} DESTINATION lofar/sas/resourceassignment/taskprescheduler)
lofar_add_bin_scripts(taskprescheduler)
# supervisord config files
install(FILES
taskprescheduler.ini
DESTINATION etc/supervisord.d)
# $Id$
#!/usr/bin/python
# $Id$
'''
TODO: add doc
'''
import logging
from datetime import datetime, timedelta
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
logger = logging.getLogger(__name__)
class TaskPrescheduler(OTDBBusListener):
def __init__(self,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
otdb_service_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_service_subject=DEFAULT_OTDB_SERVICENAME,
mom_service_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_service_subject=DEFAULT_MOMQUERY_SERVICENAME,
broker=None, **kwargs):
super(TaskPrescheduler, self).__init__(busname=otdb_notification_busname,
subject=otdb_notification_subject,
broker=broker,
**kwargs)
self.otdb = OTDBRPC(busname=otdb_service_busname, servicename=otdb_service_subject, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.momquery = MoMQueryRPC(busname=mom_query_busname, servicename=mom_query_servicename, timeout=10, broker)
def start_listening(self, **kwargs):
self.otdb.open()
self.momquery.open()
super(TaskPrescheduler, self).start_listening(**kwargs)
def stop_listening(self, **kwargs):
self.otdb.close()
self.momquery.close()
super(TaskPrescheduler, self).stop_listening(**kwargs)
def onObservationApproved(self, treeId, modificationTime):
""" Updates task specification and puts task on prescheduled if it was generated by a trigger
""" #TODO might work for all tasks in the future
try:
otdb_id = treeId
mom_ids = self.momquery.getMoMIdsForOTDBIds([otdb_id])
if mom_ids:
mom_id = mom_ids[0]
else:
mom_id = None
if len(mom_ids) > 1:
logger.warning('Found multiple mom_ids %s', mom_ids)
if mom_id:
response = self.momquery.get_trigger_id(mom_id)
if response['status'] == 'OK':
logger.info('Found a task mom_id=%s with a trigger_id=%s', mom_id, response['trigger_id'])
#TODO, check for stations and other resources, start/endtime, target position, then update specification
#self.otdb.taskSetSpecification(otdb_id, otdb_info)
logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', otdb_id, response['trigger_id'])
self.otdb.taskSetStatus(otdb_id, 'prescheduled')
else:
logger.info('Did not find a trigger for task mom_id=%s, because %s', mom_id, response['errors'])
else:
logger.info('Did not find a mom_id for task otdb_id=%s', otdb_id)
except Exception as e:
logger.error(e)
def main():
from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
# make sure we run in UTC timezone
import os
os.environ['TZ'] = 'UTC'
# Check the invocation arguments
parser = OptionParser("%prog [options]", description='runs the task prescheduler service')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("--otdb_notification_busname", dest="otdb_notification_busname", type="string",
default=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
help="Bus or queue where the OTDB notifications are published. [default: %default]")
parser.add_option("--otdb_notification_subject", dest="otdb_notification_subject", type="string",
default=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
help="Subject of OTDB notifications on otdb_notification_busname. [default: %default]")
parser.add_option("--momquery_busname", dest="momquery_busname", type="string",
default=DEFAULT_MOMQUERY_BUSNAME,
help="Name of the momquery bus exchange on the qpid broker. [default: %default]")
parser.add_option("--momquery_servicename", dest="momquery_servicename", type="string",
default=DEFAULT_MOMQUERY_SERVICENAME,
help="Name of the momquery service. [default: %default]")
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with OTDBtoRATaskStatusPropagator(otdb_notification_busname=options.otdb_notification_busname,
otdb_notification_subject=options.otdb_notification_subject,
radb_busname=options.radb_busname,
radb_servicename=options.radb_servicename,
mom_service_busname=options.momquery_busname,
mom_service_subject=options.momquery_servicename,
broker=options.broker):
waitForInterrupt()
if __name__ == '__main__':
main()
#!/usr/bin/python
# $Id: taskprescheduler 33373 2016-01-22 11:01:15Z schaap $
'''
runs the task prescheduler service
'''
from lofar.sas.resourceassignment.taskprescheduler.prescheduler import main
if __name__ == '__main__':
main()
[program:taskprescheduler]
command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec taskprescheduler'
user=lofarsys
stopsignal=INT ; KeyboardInterrupt
stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $
include(LofarCTest)
include(FindPythonModule)
find_python_module(mock REQUIRED)
lofar_add_test(test_taskprescheduler)
This diff is collapsed.
#!/bin/bash
# Run the unit test
source python-coverage.sh
python_coverage_test "TaskPrescheduler/*" test_taskprescheduler.py
#!/bin/sh
./runctest.sh test_taskprescheduler
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment