Commit c74ca8e6 authored by Jorrit Schaap's avatar Jorrit Schaap

Merge branch 'SW-826' into 'LOFAR-Release-4_0'

Resolve SW-826

See merge request ro/lofar!69
parents 5456fe87 69ab7027
......@@ -3,7 +3,7 @@
lofar_package(Common 3.3)
include(LofarFindPackage)
lofar_find_package(Casacore COMPONENTS casa)
# lofar_find_package(Casacore COMPONENTS casa)
lofar_find_package(Boost REQUIRED)
lofar_find_package(Readline)
......
......@@ -108,7 +108,9 @@ namespace LOFAR {
#else
void CasaLogSink::attach()
{}
{
cerr << "WARNING: no casa logging available." << endl;
}
#endif
} // end namespaces
......@@ -983,7 +983,7 @@ class Specification:
self.status = task["status"]
self.type = task["type"]
self.duration = timedelta(seconds = task["duration"])
self.cluster = task["cluster"]
self.cluster = task.get("cluster", "CEP4")
#we don't seem to need specification_id?
logger.info("Read task from RADB: %s", task)
return task
......@@ -1071,7 +1071,7 @@ class Specification:
specification_id = result['specification_id'] # We never seem to need this again
task_id = result['task_id']
logger.info('inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id))
logger.info('inserted/updated specification (id=%s) and task (id=%s)' % (specification_id, task_id))
return task_id
def _link_predecessors_to_task_in_radb(self):
......@@ -1092,24 +1092,30 @@ class Specification:
# check if the predecessor needs to be linked to this task
predecessor_task = self.radb.getTask(mom_id=predecessor_mom_id)
if predecessor_task:
# do Specification-class bookkeeping (stupid, because all info is in the radb already)
predecessor_keys = [p.radb_id for p in self.predecessors]
if predecessor_task['id'] not in predecessor_keys:
logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s '
'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], self.mom_id,
self.otdb_id)
spec = Specification(self.otdbrpc, self.momquery, self.radb)
spec.read_from_radb(predecessor_task['id'])
self.predecessors.append(spec) # TODO this needs a try/except somewhere?
try:
self.radb.insertTaskPredecessor(self.radb_id, spec.radb_id)
except PostgresDBQueryExecutionError as e:
# task was already connected to predecessor. Log and continue.
if 'task_predecessor_unique' in str(e):
logger.info('predecessor task with mom_id=%s otdb_id=%s was already connected to its successor with mom_id=%s otdb_id=%s',
predecessor_task['mom_id'], predecessor_task['otdb_id'],
self.mom_id, self.otdb_id)
else:
raise
pred_spec = Specification(self.otdbrpc, self.momquery, self.radb)
pred_spec.read_from_radb(predecessor_task['id'])
self.predecessors.append(pred_spec) # TODO this needs a try/except somewhere?
# do radb task-predecessor bookkeeping if needed
try:
task = self.radb.getTask(self.radb_id)
if predecessor_task['id'] not in task['predecessor_ids']:
self.radb.insertTaskPredecessor(self.radb_id, predecessor_task['id'])
except PostgresDBQueryExecutionError as e:
# task was already connected to predecessor. Log and continue.
if 'task_predecessor_unique' in str(e):
logger.info('predecessor task with mom_id=%s otdb_id=%s was already connected to its successor with mom_id=%s otdb_id=%s',
predecessor_task['mom_id'], predecessor_task['otdb_id'],
self.mom_id, self.otdb_id)
else:
raise
else:
# Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond
# approved, which is in principle valid. The link in the radb will be made later via processSuccessors()
......@@ -1142,16 +1148,20 @@ class Specification:
' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], self.mom_id, self.otdb_id
)
self.successor_ids.append(successor_task['id'])
try:
# do radb task-successor bookkeeping if needed
try:
task = self.radb.getTask(self.radb_id)
if successor_task['id'] not in task['successor_ids']:
self.radb.insertTaskPredecessor(successor_task['id'], self.radb_id)
except PostgresDBQueryExecutionError as e:
# task was already connected to predecessor. Log and continue.
if 'task_predecessor_unique' in str(e):
logger.info('successor task with mom_id=%s otdb_id=%s was already connected to its predecessor with mom_id=%s otdb_id=%s',
successor_task['mom_id'], successor_task['otdb_id'],
self.mom_id, self.otdb_id)
else:
raise
except PostgresDBQueryExecutionError as e:
# task was already connected to predecessor. Log and continue.
if 'task_predecessor_unique' in str(e):
logger.info('successor task with mom_id=%s otdb_id=%s was already connected to its predecessor with mom_id=%s otdb_id=%s',
successor_task['mom_id'], successor_task['otdb_id'],
self.mom_id, self.otdb_id)
else:
raise
movePipelineAfterItsPredecessors(successor_task, self.radb)
else:
......
......@@ -1373,5 +1373,44 @@ class ReadFromRadb(unittest.TestCase):
self.assertEqual(self.specification.duration, timedelta(seconds = task["duration"]))
self.assertEqual(self.specification.cluster, task["cluster"])
def test_insert_into_radb_and_check_predecessors(self):
# Arrange
def mock_getTask(id=None, mom_id=None, otdb_id=None, specification_id=None):
if id is None and mom_id is not None:
id = mom_id
return {"id": id, "mom_id": id, "otdb_id": id, "status": "approved", "type": "observation", "duration": 100, "predecessor_ids": []}
self.radbrpc_mock.getTask.side_effect = mock_getTask
self.radbrpc_mock.insertOrUpdateSpecificationAndTask.return_value = {'specification_id': 1, 'task_id': 1}
self.momrpc_mock.getPredecessorIds.return_value = {'1': [42]}
self.specification.read_from_radb(1)
# Act
self.specification.insert_into_radb()
# Assert
self.radbrpc_mock.insertTaskPredecessor.assert_called_with(1, 42)
# Arrange
# now adapt the mock_getTask, and let it return the inserted predecessor_ids as well
def mock_getTask(id=None, mom_id=None, otdb_id=None, specification_id=None):
if id is None and mom_id is not None:
id = mom_id
task = {"id": id, "mom_id": id, "otdb_id": id, "status": "approved", "type": "observation", "duration": 100, "predecessor_ids": []}
if id == 1:
task['predecessor_ids'] = [42]
return task
self.radbrpc_mock.getTask.side_effect = mock_getTask
self.radbrpc_mock.insertTaskPredecessor.reset_mock()
# Act
self.specification.insert_into_radb()
# Assert
self.radbrpc_mock.insertTaskPredecessor.assert_not_called()
if __name__ == "__main__":
unittest.main()
File mode changed from 100644 to 100755
......@@ -28,7 +28,7 @@ selecting the right timeslot and updating start/end time.
import pprint
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler
from lofar.sas.resourceassignment.resourceassigner.rabuslistener import RABusListener, RAEventMessageHandler
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize
......@@ -83,7 +83,7 @@ def cobaltOTDBsettings(cobalt_values):
otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"]
return otdb_info
class TaskPrescheduler(OTDBEventMessageHandler):
class TaskPreschedulerEventHandler(RAEventMessageHandler):
def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
super().__init__()
self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker)
......@@ -102,7 +102,7 @@ class TaskPrescheduler(OTDBEventMessageHandler):
self.radb.disconnect()
super().stop_handling()
def onObservationApproved(self, treeId, modificationTime):
def onTaskApproved(self, task_ids):
""" Updates task specification and puts task on prescheduled if it was generated by a trigger
"""
# TODO might work for all tasks in the future
......@@ -115,58 +115,35 @@ class TaskPrescheduler(OTDBEventMessageHandler):
# Maybe these checks need to go into the RATaskSpecified instead.
# NOTE: The MoM predecessor Ids to OTDB predecessor Ids conversion is done in RATaskSpecified on the fly
# otdb_id = treeId
#
# Note: Race condition when asking MoM as the mom-otdb-adapter might not have heard that the
# task is on approved and might still be on approved pending in MoM.
# so don't ask the MomQuery: mom_ids = self.momquery.getMoMIdsForOTDBIds([otdb_id])
# We get the mom_id from the parset
#
# We get the parset for all tasks we receive instead of just for the ones with
# a trigger.
status = "approved"
otdb_id = task_ids['otdb_id']
spec = Specification(self.otdbrpc, self.momquery, self.radb)
spec.set_status(status)
spec.read_from_OTDB_with_predecessors(treeId, "otdb", {}) #Now checks predecessors, which theoretically could cause race contitions
spec.read_from_OTDB_with_predecessors(otdb_id, "otdb", {}) #Now checks predecessors, which theoretically could cause race contitions
spec.read_from_mom()
if spec.status == "error":
return
spec.update_start_end_times()
spec.insert_into_radb()
# if spec.validate()?
if spec.status != status:
return
if not spec.mom_id:
return
if spec.isTriggered():
if spec.isTriggered() and spec.isObservation():
logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, spec.trigger_id)
otdb_info = {}
if spec.isObservation():
cobalt_values = calculateCobaltSettings(spec)
otdb_info.update(cobaltOTDBsettings(cobalt_values))
self.setOTDBinfo(spec.otdb_id, otdb_info, 'prescheduled')
cobalt_values = calculateCobaltSettings(spec)
extended_otdb_specficition = cobaltOTDBsettings(cobalt_values)
logger.info('Extending otdb specification for otdb_id %s with cobalt settings:\n%s',
otdb_id, pprint.pformat(extended_otdb_specficition))
self.otdbrpc.taskSetSpecification(otdb_id, extended_otdb_specficition)
# make sure the task is starting/stopping at the correct times meeting all constraints.
# this call uploads the correct start/stop times to otdb.
spec.update_start_end_times()
logger.info('Setting status to prescheduled for otdb_id %s so the resourceassigner can schedule the observation', otdb_id)
self.otdbrpc.taskSetStatus(otdb_id, 'prescheduled')
else:
logger.info('Did not find a trigger for task mom_id=%s', spec.mom_id)
def setOTDBinfo(self, otdb_id, otdb_info, otdb_status):
"""This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator"""
try:
if otdb_info:
logger.info('Setting specification for otdb_id %s:\n', otdb_id)
logger.info(pprint.pformat(otdb_info))
self.otdbrpc.taskSetSpecification(otdb_id, otdb_info)
#We probably will need this as well
#self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"],
# otdb_info["LOFAR.ObsSW.Observation.stopTime"])
logger.info('Setting status (%s) for otdb_id %s', otdb_status, otdb_id)
self.otdbrpc.taskSetStatus(otdb_id, otdb_status)
except Exception as e:
logger.exception(e)
logger.error("Problem setting specification or status in OTDB for otdb_id=%s", otdb_id)
self.radb.updateTaskStatusForOtdbId(otdb_id, 'error') # We don't catch an exception if this fails.
def main():
from optparse import OptionParser
from lofar.common.util import waitForInterrupt
......@@ -178,7 +155,7 @@ def main():
# Check the invocation arguments
parser = OptionParser("%prog [options]", description='runs the task prescheduler service')
parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the qpid broker, default: localhost')
help='Address of the qpid broker, default: %default')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
parser.add_option('-e', '--exchange', dest='exchange', type='string',
default=DEFAULT_BUSNAME,
......@@ -188,10 +165,10 @@ def main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with OTDBBusListener(handler_type=TaskPrescheduler,
exchange=options.exchange,
broker=options.broker,
num_threads=1):
with RABusListener(handler_type=TaskPreschedulerEventHandler,
exchange=options.exchange,
broker=options.broker,
num_threads=1):
waitForInterrupt()
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment