Skip to content
Snippets Groups Projects
Commit 097321af authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #11100: changes to handle various sources of start/endtime duration and such.

parent 72fbc830
Branches
Tags
No related merge requests found
......@@ -49,6 +49,7 @@ from lofar.common.util import waitForInterrupt
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.common import dbcredentials
from lofar.common.util import convertIntKeysToString
from lofar.common.datetimeutils import parseDatetime
logger=logging.getLogger(__file__)
......@@ -1066,67 +1067,32 @@ where project.mom2id = %s and (project_role.name = "Pi" or project_role.name = "
return misc
def get_time_restrictions(self, mom_id):
def get_trigger_time_restrictions(self, mom_id):
"""
Returns min start and max end times and min/max duration for given mom id.
:param mom_id: int
:return: dict
"""
logger.info("get_time_restrictions for mom_id: %s", mom_id)
time_restrictions = {}
# Note: this duplicates the _get_misc_contents(), but we save a (quite expensive?) additional query if we
# handle this together with duration:
query = """SELECT mom.mom2id, mom.mom2objecttype, obs_spec.misc, obs_spec.spec_duration AS duration,
obs_spec.starttime, obs_spec.endtime
FROM mom2object as mom
join lofar_observation as obs on mom.mom2objecttype = "LOFAR_OBSERVATION" and mom.id = obs.mom2objectid
join lofar_observation_specification as obs_spec on
mom.mom2objecttype = "LOFAR_OBSERVATION" and obs.user_specification_id = obs_spec.id
where mom.mom2id=%s
union
SELECT mom.mom2id, mom.mom2objecttype, pipeline.misc, pipeline.duration AS duration,
pipeline.starttime, pipeline.endtime
FROM mom2object as mom
join lofar_pipeline as pipeline on mom.mom2objecttype like "%PIPELINE%"
and mom.id = pipeline.mom2objectid
where mom.mom2id=%s;"""
parameters = (mom_id, mom_id)
rows = self._executeSelectQuery(query, parameters)
if rows is None or len(rows) == 0:
logger.info("get_trigger_time_restrictions for mom_id: %s", mom_id)
result = {"minStartTime": None, "minDuration": None, "maxDuration": None, "maxEndTime": None, "trigger_id": None}
misc = self._get_misc_contents(mom_id)
if misc is None:
raise ValueError("mom_id (%s) not found in MoM database" % mom_id)
# add timewindow to response, if present
misc_json = rows[0]['misc']
if misc_json is not None:
misc = json.loads(misc_json)
if 'timeWindow' in misc:
time_restrictions.update(misc['timeWindow'])
# use mom db duration and startime to fill missing info (all strings)
starttime = rows[0]['starttime'] # datetime
endtime = rows[0]['endtime'] # datetime
duration = rows[0]['duration'] # float
if ('minDuration' not in time_restrictions or 'maxDuration' not in time_restrictions) and duration:
time_restrictions['minDuration'] = time_restrictions['maxDuration'] = str(int(duration))
if 'minStartTime' not in time_restrictions or 'maxEndTime' not in time_restrictions:
time_formatting = "%Y-%m-%dT%H:%M:%S"
if starttime:
time_restrictions['minStartTime'] = starttime.strftime(time_formatting)
if endtime:
time_restrictions['maxEndTime'] = endtime.strftime(time_formatting)
elif starttime and duration:
max_endtime = starttime + timedelta(seconds=int(duration))
time_restrictions['maxEndTime'] = max_endtime.strftime(time_formatting)
if len(time_restrictions) == 0:
raise ValueError("No time restrictions for mom_id (%s) in database" % mom_id)
logger.info("get_time_restrictions for mom_id (%s): %s", mom_id, time_restrictions)
return time_restrictions
time_window = misc['timeWindow']
if "trigger_id" in misc:
result["trigger_id"] = misc['trigger_id']
else:
raise NotImplementedError("TimeWindow specified for a non-triggered observation %s", mom_id)
if "minStartTime" in time_window:
result["minStartTime"] = parseDatetime(time_window["minStartTime"].replace('T', ' ')) # The T is from XML
if "maxDuration" in time_window:
result["maxDuration"] = timedelta(seconds=int(time_window["maxDuration"]))
if "minDuration" in time_window:
result["minDuration"] = timedelta(seconds=int(time_window["minDuration"]))
if "maxEndTime" in time_window:
result["maxEndTime"] = parseDatetime(time_window["maxEndTime"].replace('T', ' ')) # The T is from XML
return result
def get_station_selection(self, mom_id):
"""
......@@ -1148,7 +1114,6 @@ where project.mom2id = %s and (project_role.name = "Pi" or project_role.name = "
return station_selection
class ProjectDetailsQueryHandler(MessageHandlerInterface):
"""
handler class for details query in mom db
......@@ -1184,7 +1149,7 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface):
'GetTaskIdsGraph': self.getTaskIdsGraph,
'GetProjectPrioritiesForObjects': self.get_project_priorities_for_objects,
'GetStationSelection': self.get_station_selection,
'GetTimeRestrictions': self.get_time_restrictions
'GetTimeRestrictions': self.get_trigger_time_restrictions
}
def prepare_loop(self):
......@@ -1277,8 +1242,8 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface):
def getTaskIdsGraph(self, mom2id):
return convertIntKeysToString(self.momdb.getTaskIdsGraph(mom2id))
def get_time_restrictions(self, mom_id):
return self.momdb.get_time_restrictions(mom_id)
def get_trigger_time_restrictions(self, mom_id):
return self.momdb.get_trigger_time_restrictions(mom_id)
def get_station_selection(self, mom_id):
return self.momdb.get_station_selection(mom_id)
......
......@@ -24,6 +24,7 @@ from datetime import datetime
from mysql import connector
import logging
import json
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
......@@ -518,48 +519,48 @@ class TestProjectDetailsQueryHandler(unittest.TestCase):
self.assertEqual(len(return_value['trigger_spec']),
len(trigger_specification))
def test_get_trigger_id_returns_trigger_id_when_mom_wrapper_returns_an_id(self):
trigger_id = 1234
self.mom_database_wrapper_mock().get_trigger_id.return_value = trigger_id
return_value = self.project_details_query_handler.get_trigger_id(5432)
self.assertEqual(return_value['trigger_id'], trigger_id)
def test_get_trigger_id_returns_status_ok_when_mom_wrapper_returns_an_id(self):
trigger_id = 1234
self.mom_database_wrapper_mock().get_trigger_id.return_value = trigger_id
return_value = self.project_details_query_handler.get_trigger_id(5432)
self.assertEqual(return_value['status'], "OK")
def test_get_trigger_id_returns_status_error_when_mom_wrapper_returns_none(self):
self.mom_database_wrapper_mock().get_trigger_id.return_value = None
return_value = self.project_details_query_handler.get_trigger_id(5432)
self.assertEqual(return_value['status'], "Error")
def test_get_trigger_id_returns_error_when_mom_wrapper_returns_none(self):
mom_id = 5432
self.mom_database_wrapper_mock().get_trigger_id.return_value = None
return_value = self.project_details_query_handler.get_trigger_id(mom_id)
self.assertEqual(return_value['errors'][0], "No trigger_id for mom_id: " + str(mom_id))
def test_get_project_details_returns_author_email(self):
author_email = "author@email.com"
self.mom_database_wrapper_mock().get_project_details.return_value = \
{"author_email": author_email, "pi_email": "pi@email.com"}
return_value = self.project_details_query_handler.get_project_details(24343)
self.assertEqual(return_value["author_email"], author_email)
# def test_get_trigger_id_returns_trigger_id_when_mom_wrapper_returns_an_id(self):
# trigger_id = 1234
#
# self.mom_database_wrapper_mock().get_trigger_id.return_value = trigger_id
#
# return_value = self.project_details_query_handler.get_trigger_id(5432)
#
# self.assertEqual(return_value['trigger_id'], trigger_id)
#
# def test_get_trigger_id_returns_status_ok_when_mom_wrapper_returns_an_id(self):
# trigger_id = 1234
#
# self.mom_database_wrapper_mock().get_trigger_id.return_value = trigger_id
#
# return_value = self.project_details_query_handler.get_trigger_id(5432)
#
# self.assertEqual(return_value['status'], "OK")
#
# def test_get_trigger_id_returns_status_error_when_mom_wrapper_returns_none(self):
# self.mom_database_wrapper_mock().get_trigger_id.return_value = None
#
# return_value = self.project_details_query_handler.get_trigger_id(5432)
#
# self.assertEqual(return_value['status'], "Error")
#
# def test_get_trigger_id_returns_error_when_mom_wrapper_returns_none(self):
# mom_id = 5432
#
# self.mom_database_wrapper_mock().get_trigger_id.return_value = None
#
# return_value = self.project_details_query_handler.get_trigger_id(mom_id)
#
# self.assertEqual(return_value['errors'][0], "No trigger_id for mom_id: " + str(mom_id))
#
# def test_get_project_details_returns_author_email(self):
# author_email = "author@email.com"
# self.mom_database_wrapper_mock().get_project_details.return_value = \
# {"author_email": author_email, "pi_email": "pi@email.com"}
#
# return_value = self.project_details_query_handler.get_project_details(24343)
#
# self.assertEqual(return_value["author_email"], author_email)
def test_get_project_details_returns_pi_email(self):
pi_email = "pi@email.com"
......@@ -570,22 +571,22 @@ class TestProjectDetailsQueryHandler(unittest.TestCase):
self.assertEqual(return_value["pi_email"], pi_email)
def test_get_time_restrictions_returns_what_the_mom_wrapper_returns(self):
min_start_time = "2017-01-01"
max_end_time = "2017-01-02"
def test_get_trigger_time_restrictions_returns_what_the_mom_wrapper_returns(self):
min_start_time = "2017-01-01T12:00:00"
max_end_time = "2017-01-02T01:00:03.0000"
min_duration = 300
max_duration = 600
self.mom_database_wrapper_mock().get_time_restrictions.return_value = \
self.mom_database_wrapper_mock().get_trigger_time_restrictions.return_value = \
{"minStartTime": min_start_time, "maxEndTime": max_end_time,
"minDuration": min_duration, "maxDuration": max_duration}
result = self.project_details_query_handler.get_time_restrictions(1234)
result = self.project_details_query_handler.get_trigger_time_restrictions(1234)
self.assertEqual(result["minStartTime"], min_start_time)
self.assertEqual(result["maxEndTime"], max_end_time)
self.assertEqual(result["minDuration"], min_duration)
self.assertEqual(result["maxDuration"], max_duration)
self.assertEqual(result["minStartTime"], datetime(year=2017, month=1, day=1, hour=12))
self.assertEqual(result["maxEndTime"], datetime(year=2017, month=1, day=2, hour=1, minute=3))
self.assertEqual(result["minDuration"], timedelta(seconds=300))
self.assertEqual(result["maxDuration"], timedelta(seconds=600))
def test_get_station_selection_returns_what_the_mom_wrapper_returns(self):
resource_group = "SuperTerp"
......@@ -1177,48 +1178,48 @@ class TestMomQueryRPC(unittest.TestCase):
self.assertEqual(len(result['trigger_spec']),
len(trigger_specification))
@mock.patch('lofar.messaging.messagebus.qpid.messaging')
def test_get_trigger_id_logs_before_query(self, qpid_mock):
self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id
mom_id = 6789
qpid_mock.Message = QpidMessage
qpid_mock.Connection().session().senders = [self.sender_mock]
qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock
self.momrpc.get_trigger_id(mom_id)
self.logger_mock.info.assert_any_call("Requesting GetTriggerId for mom_id: %s", mom_id)
@mock.patch('lofar.messaging.messagebus.qpid.messaging')
def test_get_trigger_id_logs_after_query(self, qpid_mock):
self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id
mom_id = 6789
qpid_mock.Message = QpidMessage
qpid_mock.Connection().session().senders = [self.sender_mock]
qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock
result = self.momrpc.get_trigger_id(mom_id)
self.logger_mock.info.assert_any_call("Received trigger_id: %s", result)
@mock.patch('lofar.messaging.messagebus.qpid.messaging')
def test_get_trigger_id_query(self, qpid_mock):
self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id
mom_id = 6789
qpid_mock.Message = QpidMessage
qpid_mock.Connection().session().senders = [self.sender_mock]
qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock
result = self.momrpc.get_trigger_id(mom_id)
self.assertEqual(result["trigger_id"], self.trigger_id)
self.assertEqual(result["status"], "OK")
# @mock.patch('lofar.messaging.messagebus.qpid.messaging')
# def test_get_trigger_id_logs_before_query(self, qpid_mock):
# self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id
#
# mom_id = 6789
#
# qpid_mock.Message = QpidMessage
# qpid_mock.Connection().session().senders = [self.sender_mock]
# qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock
#
# self.momrpc.get_trigger_id(mom_id)
#
# self.logger_mock.info.assert_any_call("Requesting GetTriggerId for mom_id: %s", mom_id)
#
# @mock.patch('lofar.messaging.messagebus.qpid.messaging')
# def test_get_trigger_id_logs_after_query(self, qpid_mock):
# self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id
#
# mom_id = 6789
#
# qpid_mock.Message = QpidMessage
# qpid_mock.Connection().session().senders = [self.sender_mock]
# qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock
#
# result = self.momrpc.get_trigger_id(mom_id)
#
# self.logger_mock.info.assert_any_call("Received trigger_id: %s", result)
#
# @mock.patch('lofar.messaging.messagebus.qpid.messaging')
# def test_get_trigger_id_query(self, qpid_mock):
# self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id
#
# mom_id = 6789
#
# qpid_mock.Message = QpidMessage
# qpid_mock.Connection().session().senders = [self.sender_mock]
# qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock
#
# result = self.momrpc.get_trigger_id(mom_id)
#
# self.assertEqual(result["trigger_id"], self.trigger_id)
# self.assertEqual(result["status"], "OK")
@mock.patch('lofar.messaging.messagebus.qpid.messaging')
def test_get_project_details_logs_before_query(self, qpid_mock):
......@@ -1729,13 +1730,10 @@ class TestMoMDatabaseWrapper(unittest.TestCase):
self.mom_database_wrapper.get_station_selection(1234)
def test_get_time_restrictions_returns_misc_field_info_from_query_result(self):
min_start_time = u"2017-01-01"
max_end_time = u"2017-01-04"
mom_starttime = u"2017-01-02"
mom_endtime = u"2017-01-03"
min_start_time = u"2017-01-01T12:00:00"
max_end_time = u"2017-01-04T01:00:00"
min_duration = 300
max_duration = 600
mom_duration = 400
timewindow = {u"minStartTime": min_start_time,
u"maxEndTime": max_end_time,
......@@ -1744,38 +1742,37 @@ class TestMoMDatabaseWrapper(unittest.TestCase):
expected_result = timewindow
details_result = [{u"mom2id": self.mom_id, u"mom2objecttype": self.job_type,
u"misc": json.dumps({u"timeWindow": timewindow}), u"duration": mom_duration,
u"starttime": mom_starttime, u"endtime": mom_endtime}]
u"misc": json.dumps({u"timeWindow": timewindow})}]
self.mysql_mock.connect().cursor().fetchall.return_value = details_result
result = self.mom_database_wrapper.get_time_restrictions(self.mom_id)
result = self.mom_database_wrapper.get_trigger_time_restrictions(self.mom_id)
self.assertEqual(result, expected_result)
def test_get_time_restrictions_returns_mom_info_if_misc_empty_in_query_result(self):
time_format = "%Y-%m-%dT%H:%M:%S"
mom_starttime = datetime.strptime("2017-01-02T00:00:00", time_format)
mom_endtime = datetime.strptime("2017-01-03T00:00:00", time_format)
mom_duration = 400
expected_result = {u"minStartTime": mom_starttime.strftime(time_format),
u"maxEndTime": mom_endtime.strftime(time_format),
u"minDuration": str(mom_duration),
u"maxDuration": str(mom_duration)}
details_result = [{u"mom2id": self.mom_id, u"mom2objecttype": self.job_type,
u"misc": None, u"duration": mom_duration,
u"starttime": mom_starttime, u"endtime": mom_endtime}]
self.mysql_mock.connect().cursor().fetchall.return_value = details_result
result = self.mom_database_wrapper.get_time_restrictions(self.mom_id)
self.assertEqual(result, expected_result)
# def test_get_time_restrictions_returns_mom_info_if_misc_empty_in_query_result(self):
# time_format = "%Y-%m-%dT%H:%M:%S"
# mom_starttime = datetime.strptime("2017-01-02T00:00:00", time_format)
# mom_endtime = datetime.strptime("2017-01-03T00:00:00", time_format)
# mom_duration = 400
#
# expected_result = {u"minStartTime": mom_starttime.strftime(time_format),
# u"maxEndTime": mom_endtime.strftime(time_format),
# u"minDuration": str(mom_duration),
# u"maxDuration": str(mom_duration)}
#
# details_result = [{u"mom2id": self.mom_id, u"mom2objecttype": self.job_type,
# u"misc": None, u"duration": mom_duration,
# u"starttime": mom_starttime, u"endtime": mom_endtime}]
# self.mysql_mock.connect().cursor().fetchall.return_value = details_result
#
# result = self.mom_database_wrapper.get_trigger_time_restrictions(self.mom_id)
# self.assertEqual(result, expected_result)
def test_get_time_restrictions_throws_ValueError_on_empty_query_result(self):
self.mysql_mock.connect().cursor().fetchall.return_value = []
with self.assertRaises(ValueError):
self.mom_database_wrapper.get_time_restrictions(1234)
self.mom_database_wrapper.get_trigger_time_restrictions(1234)
def test_get_time_restrictions_throws_ValueError_if_no_time_restrictions_in_query_result(self):
details_result = [{u"mom2id": self.mom_id, u"mom2objecttype": self.job_type,
......@@ -1784,7 +1781,7 @@ class TestMoMDatabaseWrapper(unittest.TestCase):
self.mysql_mock.connect().cursor().fetchall.return_value = details_result
with self.assertRaises(ValueError):
self.mom_database_wrapper.get_time_restrictions(1234)
self.mom_database_wrapper.get_trigger_time_restrictions(1234)
@unittest.skip("Skipping integration test")
......@@ -2201,7 +2198,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase):
def test_get_time_restrictions_throws_ValueError_on_empty_database(self):
with self.assertRaises(ValueError):
self.mom_database_wrapper.get_time_restrictions(1234)
self.mom_database_wrapper.get_trigger_time_restrictions(1234)
def test_get_time_restrictions_throws_ValueError_if_no_time_restrictions_in_database(self):
......@@ -2218,7 +2215,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase):
"16, NULL)")
with self.assertRaises(ValueError):
self.mom_database_wrapper.get_time_restrictions(2)
self.mom_database_wrapper.get_trigger_time_restrictions(2)
def test_get_time_restrictions_returns_correct_time_restrictions(self):
min_start_time = "2017-01-01"
......@@ -2240,7 +2237,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase):
"\"maxDuration\": %s}}')"
% (min_duration, min_start_time, max_end_time, min_duration, max_duration))
result = self.mom_database_wrapper.get_time_restrictions(2)
result = self.mom_database_wrapper.get_trigger_time_restrictions(2)
self.assertEqual(result["minStartTime"], min_start_time)
self.assertEqual(result["maxEndTime"], max_end_time)
......@@ -2262,7 +2259,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase):
"NULL, %s, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, NULL, "
"16, NULL)" % duration)
result = self.mom_database_wrapper.get_time_restrictions(2)
result = self.mom_database_wrapper.get_trigger_time_restrictions(2)
self.assertEqual(result["minDuration"], duration)
self.assertEqual(result["maxDuration"], duration)
......
......@@ -40,6 +40,7 @@ OUTPUT_PREFIX = "LOFAR.ObsSW."
# refactoring.
#TODO There are lot's of hardcoded OTDB key strings and values in here.
#TODO Maybe all/most the static methods should log what they're doing and no longer be static?
#TODO We can do more direct updating of OTDB and MoM from here, especially the MoM sytem spec.
class Specification:
def __init__(self, logger, otdbrpc, momquery, radbrpc):
......@@ -68,13 +69,14 @@ class Specification:
self.otdb_id = None # Task Id in OTDB
self.mom_id = None # Task Id in MoM
self.radb_id = None # Task Id in RADB
self.trigger_id = None # Id of trigger is this was specified in a trigger
self.type = None # Task type in RADB
self.subtype = None # Task type in RADB
self.status = None # Task status, as used in OTDB/MoM.
#Inputs for the scheduler
self.min_starttime = None
self.max_starttime = None #TODO do we need this one?
#self.max_starttime = None # We return this from calculate_dwell_values
self.max_endtime = None
self.min_duration = timedelta(seconds = 0)
self.max_duration = timedelta(seconds = 0)
......@@ -96,11 +98,12 @@ class Specification:
result["otdb_id"] = self.otdb_id
result["mom_id"] = self.mom_id
result["task_id"] = self.radb_id
result["trigger_id"] = self.trigger_id
result["status"] = self.status
result["task_type"] = self.type
result["task_subtype"] = self.subtype
result["min_starttime"] = str(self.min_starttime)
result["max_starttime"] = str(self.max_starttime)
#result["max_starttime"] = str(self.max_starttime)
result["max_endtime"] = str(self.max_endtime)
result["min_duration"] = str(self.min_duration)
result["max_duration"] = str(self.max_duration)
......@@ -109,7 +112,7 @@ class Specification:
result["duration"] = self.duration.total_seconds()
result["cluster"] = self.cluster
result["specification"] = self.internal_dict
result["specification"]["Observation.startTime"] = str(self.starttime) #TODO set/update these somewhere else
result["specification"]["Observation.startTime"] = str(self.starttime) #TODO set/update these somewhere else?
result["specification"]["Observation.stopTime"] = str(self.endtime)
result["predecessors"] = []
for p in self.predecessors:
......@@ -125,11 +128,12 @@ class Specification:
self.otdb_id = input_dict["otdb_id"]
self.mom_id = input_dict["mom_id"]
self.radb_id = input_dict["task_id"]
self.trigger_id = input_dict["trigger_id"]
self.status = input_dict["status"]
self.type = input_dict["task_type"]
self.subtype = input_dict["task_subtype"]
self.min_starttime = parseDatetime(input_dict["min_starttime"])
self.max_starttime = parseDatetime(input_dict["max_starttime"])
#self.max_starttime = parseDatetime(input_dict["max_starttime"])
self.endtime = parseDatetime(input_dict["endtime"])
self.duration = timedelta(seconds = input_dict["duration"])
self.min_duration = timedelta(seconds = input_dict["min_duration"])
......@@ -159,19 +163,30 @@ class Specification:
""""Read specification values from the MoM database, mostly the misc field time restrictions
Tries to set min_starttime, max_endtime, min_duration, max_duration, if the Specification has a mom_id
example input:
timeWindow": {"minStartTime": "2017-05-23T15:21:44", "maxEndTime": "2017-11-23T15:21:44", "minDuration": 1600, "maxDuration": 7200}
"""
if self.mom_id: #TODO cleanup and make matching with momquery
time_restrictions = self.momquery.get_time_restrictions(self.mom_id)
self.logger.info("Received time restrictions from MoM specs: %s", time_restrictions)
if time_restrictions:
self.min_starttime = parseDatetime(time_restrictions["minStartTime"].replace('T', ' ')) #This might not work for XML
if "minDuration" in time_restrictions:
self.duration = timedelta(seconds=int(time_restrictions["minDuration"]))
if "maxEndTime" in time_restrictions:
max_end_time = parseDatetime(time_restrictions["maxEndTime"].replace('T', ' '))#This might not work for XML
self.end_time = max_end_time
self.max_starttime = self.end_time - self.duration # TODO huh? This doesn't seem to match DwellScheduler specs
"""
if self.mom_id:
# We might need more than the misc field in the future.
# Right now we assume start/end times from OTDB always have priority for example.
try:
misc = self.momquery.get_trigger_time_restrictions(self.mom_id)
self.logger.info("Received misc field from MoM: %s", misc)
if misc:
if "minStartTime" in misc:
self.min_starttime = misc["minStartTime"]
if "minDuration" in misc:
self.max_duration = misc["maxDuration"]
if "maxDuration" in misc:
self.min_duration = misc["minDuration"]
if "maxEndTime" in misc:
self.max_endtime = misc["maxEndTime"]
if "trigger_id" in misc:
self.trigger_id = misc["trigger_id"]
self.logger.info('Found a task mom_id=%s with a trigger_id=%s', self.mom_id, self.trigger_id)
except Exception as e:
self.logger.exception("read_from_mom: " + str(e), exc_info=True)
self.set_status("error")
else:
self.logger.info("This task does not have a mom_id.")
# ========================= parset/OTDB related methods =======================================================================
......@@ -776,6 +791,7 @@ class Specification:
# =========================================== RADB related methods =======================================================
#TODO we might need read_from_radb_with_predecessors in the future
def read_from_radb(self, radb_id):
"""The returned task is from "SELECT * from resource_allocation.task_view tv" with start/endtime
converted to a datetime. This only reads a few values from the RADB, not the whole specification.
......@@ -783,10 +799,10 @@ class Specification:
:return: task dict (radb_id, mom_id, otdb_id, status_id, type_id, specification_id, status, type,
starttime, endtime, duration, cluster) or None
"""
#TODO maybe this should read the spec as well? Mostly here for backward compatibility, not sure if still needed
#TODO maybe this should read the spec as well? Can especially start/end times be updated outside of here?
task = self.radbrpc.getTask(radb_id) # if radb_id is not None else None
if task: #TODO what is the exact structure of task see schedulerchecker 47
self.radb_id = task["id"]
self.radb_id = task["id"] # Should be the same as radb_id, but self.radb_id might not yet be set
self.mom_id = task["mom_id"]
self.otdb_id = task["otdb_id"]
self.status = task["status"]
......@@ -815,8 +831,8 @@ class Specification:
task = self.read_from_radb(self.radb_id)
self._link_predecessors_to_task_in_radb(task)
self._link_successors_to_task_in_radb(task)
self._link_predecessors_to_task_in_radb()
self._link_successors_to_task_in_radb()
self.logger.info('Successfully inserted main task and its predecessors and successors into RADB: task=%s', task)
......@@ -879,49 +895,47 @@ class Specification:
self.logger.info('inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id))
return task_id
def _link_predecessors_to_task_in_radb(self, task):
def _link_predecessors_to_task_in_radb(self):
"""
Links a task to its predecessors in RADB
:param task: the task at hand
"""
#TODO how to keep the predecessors in MoM and in OTDB in sync here? Does it matter?
#FIXME Not sure if this actually works, task['predecessor_ids'] might no longer be set
#TODO refactor for this to use Specification instead of task?
mom_id = task['mom_id']
predecessor_ids = self.momquery.getPredecessorIds(mom_id)
if str(mom_id) not in predecessor_ids or not predecessor_ids[str(mom_id)]:
self.logger.info('no predecessors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id)
predecessor_ids = self.momquery.getPredecessorIds(self.mom_id)
if str(self.mom_id) not in predecessor_ids or not predecessor_ids[str(self.mom_id)]:
self.logger.info('no predecessors for otdb_id=%s mom_id=%s', self.otdb_id, self.mom_id)
return
predecessor_mom_ids = predecessor_ids[str(mom_id)]
predecessor_mom_ids = predecessor_ids[str(self.mom_id)]
self.logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'],
task['otdb_id'])
self.logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, self.mom_id, self.otdb_id)
for predecessor_mom_id in predecessor_mom_ids:
# check if the predecessor needs to be linked to this task
predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id)
if predecessor_task:
if predecessor_task['id'] not in task['predecessor_ids']:
predecessor_keys = [p.radb_id for p in self.predecessors]
if predecessor_task['id'] not in predecessor_keys:
self.logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s '
'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'],
task['otdb_id'])
self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id'])
'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], self.mom_id,
self.otdb_id)
spec = Specification(self.logger, self.otdbrpc, self.momquery, self.radbrpc)
spec.read_from_radb(predecessor_task['id'])
self.predecessors.append(spec) # TODO this needs a try/except somewhere?
self.radbrpc.insertTaskPredecessor(self.radb_id, spec.radb_id)
else:
# Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond
# approved, which is in principle valid. The link in the radb will be made later via processSuccessors()
# below. Alternatively, a predecessor could have been deleted.
self.logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s',
predecessor_mom_id, task['otdb_id'])
predecessor_mom_id, self.otdb_id)
def _link_successors_to_task_in_radb(self, task):
def _link_successors_to_task_in_radb(self):
"""
Links a task to its successors in RADB
"""
#TODO refactor for this to use Specification instead of task?
#FIXME Not usre if this works, as task['successor_ids'] might not be set
#FIXME Not sure if this works, as self.successor_ids might not be set outside of here
successor_ids = self.momquery.getSuccessorIds(self.mom_id)
if str(self.mom_id) not in successor_ids or not successor_ids[str(self.mom_id)]:
......@@ -936,14 +950,13 @@ class Specification:
# check if the successor needs to be linked to this task
successor_task = self.radbrpc.getTask(mom_id=successor_mom_id)
if successor_task:
if successor_task['id'] not in task['successor_ids']:
if successor_task['id'] not in self.successor_ids:
self.logger.info(
'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s'
' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'],
task['otdb_id']
' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], self.mom_id, self.otdb_id
)
self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id'])
self.successor_ids.append(successor_task['id'])
self.radbrpc.insertTaskPredecessor(successor_task['id'], self.radb_id)
movePipelineAfterItsPredecessors(successor_task, self.radbrpc)
else:
......@@ -951,4 +964,4 @@ class Specification:
# approved, which is quite normal. The link in the radb will be made later via processPredecessors()
# above. Alternatively, a successor could have been deleted.
self.logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s',
successor_mom_id, task['otdb_id'])
successor_mom_id, self.otdb_id)
......@@ -158,6 +158,7 @@ class TaskPrescheduler(OTDBBusListener):
spec = Specification(logger, self.otdbrpc, self.momquery, self.radbrpc)
spec.status = status
spec.read_from_otdb(treeId)
spec.read_from_mom()
spec.update_start_end_times()
spec.insert_into_radb()
# if spec.validate()?
......@@ -165,20 +166,15 @@ class TaskPrescheduler(OTDBBusListener):
return
if not spec.mom_id:
return
response = self.momquery.get_trigger_id(spec.mom_id)
if response['status'] == 'OK':
logger.info('Found a task mom_id=%s with a trigger_id=%s', spec.mom_id, response['trigger_id'])
#TODO, check for stations and other resources, start/endtime, target position, then update specification
logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, response['trigger_id'])
if spec.trigger_id:
logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, spec.trigger_id)
otdb_info = {}
if spec.isObservation():
cobalt_values = calculateCobaltSettings(spec)
otdb_info.update(cobaltOTDBsettings(cobalt_values))
self.setOTDBinfo(spec.otdb_id, otdb_info, 'prescheduled')
else:
logger.info('Did not find a trigger for task mom_id=%s, because %s', spec.mom_id, response['errors'])
logger.info('Did not find a trigger for task mom_id=%s, because %s', spec.mom_id)
def setOTDBinfo(self, otdb_id, otdb_info, otdb_status):
"""This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment