diff --git a/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py b/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py index 272b2c16f5e31381a9a9382f6db0bbc2b08e09c2..de05c060e19e3a9fa7d3c64c797926711f2c4f2a 100755 --- a/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py +++ b/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py @@ -49,6 +49,7 @@ from lofar.common.util import waitForInterrupt from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.common import dbcredentials from lofar.common.util import convertIntKeysToString +from lofar.common.datetimeutils import parseDatetime logger=logging.getLogger(__file__) @@ -1066,67 +1067,32 @@ where project.mom2id = %s and (project_role.name = "Pi" or project_role.name = " return misc - def get_time_restrictions(self, mom_id): + def get_trigger_time_restrictions(self, mom_id): """ Returns min start and max end times and min/max duration for given mom id. :param mom_id: int - :return: dict - """ - logger.info("get_time_restrictions for mom_id: %s", mom_id) - - time_restrictions = {} - - # Note: this duplicates the _get_misc_contents(), but we save a (quite expensive?) additional query if we - # handle this together with duration: - query = """SELECT mom.mom2id, mom.mom2objecttype, obs_spec.misc, obs_spec.spec_duration AS duration, - obs_spec.starttime, obs_spec.endtime - FROM mom2object as mom - join lofar_observation as obs on mom.mom2objecttype = "LOFAR_OBSERVATION" and mom.id = obs.mom2objectid - join lofar_observation_specification as obs_spec on - mom.mom2objecttype = "LOFAR_OBSERVATION" and obs.user_specification_id = obs_spec.id - where mom.mom2id=%s - union - SELECT mom.mom2id, mom.mom2objecttype, pipeline.misc, pipeline.duration AS duration, - pipeline.starttime, pipeline.endtime - FROM mom2object as mom - join lofar_pipeline as pipeline on mom.mom2objecttype like "%PIPELINE%" - and mom.id = pipeline.mom2objectid - where mom.mom2id=%s;""" - parameters = (mom_id, mom_id) - rows = self._executeSelectQuery(query, parameters) - - if rows is None or len(rows) == 0: + :return: dict + """ + logger.info("get_trigger_time_restrictions for mom_id: %s", mom_id) + result = {"minStartTime": None, "minDuration": None, "maxDuration": None, "maxEndTime": None, "trigger_id": None} + misc = self._get_misc_contents(mom_id) + if misc is None: raise ValueError("mom_id (%s) not found in MoM database" % mom_id) - - # add timewindow to response, if present - misc_json = rows[0]['misc'] - if misc_json is not None: - misc = json.loads(misc_json) - if 'timeWindow' in misc: - time_restrictions.update(misc['timeWindow']) - - # use mom db duration and startime to fill missing info (all strings) - starttime = rows[0]['starttime'] # datetime - endtime = rows[0]['endtime'] # datetime - duration = rows[0]['duration'] # float - if ('minDuration' not in time_restrictions or 'maxDuration' not in time_restrictions) and duration: - time_restrictions['minDuration'] = time_restrictions['maxDuration'] = str(int(duration)) - if 'minStartTime' not in time_restrictions or 'maxEndTime' not in time_restrictions: - time_formatting = "%Y-%m-%dT%H:%M:%S" - if starttime: - time_restrictions['minStartTime'] = starttime.strftime(time_formatting) - if endtime: - time_restrictions['maxEndTime'] = endtime.strftime(time_formatting) - elif starttime and duration: - max_endtime = starttime + timedelta(seconds=int(duration)) - time_restrictions['maxEndTime'] = max_endtime.strftime(time_formatting) - - if len(time_restrictions) == 0: - raise ValueError("No time restrictions for mom_id (%s) in database" % mom_id) - - logger.info("get_time_restrictions for mom_id (%s): %s", mom_id, time_restrictions) - - return time_restrictions + if 'timeWindow' in misc: + time_window = misc['timeWindow'] + if "trigger_id" in misc: + result["trigger_id"] = misc['trigger_id'] + else: + raise NotImplementedError("TimeWindow specified for a non-triggered observation %s", mom_id) + if "minStartTime" in time_window: + result["minStartTime"] = parseDatetime(time_window["minStartTime"].replace('T', ' ')) # The T is from XML + if "maxDuration" in time_window: + result["maxDuration"] = timedelta(seconds=int(time_window["maxDuration"])) + if "minDuration" in time_window: + result["minDuration"] = timedelta(seconds=int(time_window["minDuration"])) + if "maxEndTime" in time_window: + result["maxEndTime"] = parseDatetime(time_window["maxEndTime"].replace('T', ' ')) # The T is from XML + return result def get_station_selection(self, mom_id): """ @@ -1148,7 +1114,6 @@ where project.mom2id = %s and (project_role.name = "Pi" or project_role.name = " return station_selection - class ProjectDetailsQueryHandler(MessageHandlerInterface): """ handler class for details query in mom db @@ -1184,7 +1149,7 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface): 'GetTaskIdsGraph': self.getTaskIdsGraph, 'GetProjectPrioritiesForObjects': self.get_project_priorities_for_objects, 'GetStationSelection': self.get_station_selection, - 'GetTimeRestrictions': self.get_time_restrictions + 'GetTimeRestrictions': self.get_trigger_time_restrictions } def prepare_loop(self): @@ -1277,8 +1242,8 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface): def getTaskIdsGraph(self, mom2id): return convertIntKeysToString(self.momdb.getTaskIdsGraph(mom2id)) - def get_time_restrictions(self, mom_id): - return self.momdb.get_time_restrictions(mom_id) + def get_trigger_time_restrictions(self, mom_id): + return self.momdb.get_trigger_time_restrictions(mom_id) def get_station_selection(self, mom_id): return self.momdb.get_station_selection(mom_id) diff --git a/SAS/MoM/MoMQueryService/test/t_momqueryservice.py b/SAS/MoM/MoMQueryService/test/t_momqueryservice.py index 7bb5fdcf7a29313f4eab7f340ddc5d50427f8444..711a25377cbd0b6f51c216107f9d20d7e423c7f6 100755 --- a/SAS/MoM/MoMQueryService/test/t_momqueryservice.py +++ b/SAS/MoM/MoMQueryService/test/t_momqueryservice.py @@ -24,6 +24,7 @@ from datetime import datetime from mysql import connector import logging import json +from datetime import datetime, timedelta logger = logging.getLogger(__name__) @@ -518,48 +519,48 @@ class TestProjectDetailsQueryHandler(unittest.TestCase): self.assertEqual(len(return_value['trigger_spec']), len(trigger_specification)) - def test_get_trigger_id_returns_trigger_id_when_mom_wrapper_returns_an_id(self): - trigger_id = 1234 - - self.mom_database_wrapper_mock().get_trigger_id.return_value = trigger_id - - return_value = self.project_details_query_handler.get_trigger_id(5432) - - self.assertEqual(return_value['trigger_id'], trigger_id) - - def test_get_trigger_id_returns_status_ok_when_mom_wrapper_returns_an_id(self): - trigger_id = 1234 - - self.mom_database_wrapper_mock().get_trigger_id.return_value = trigger_id - - return_value = self.project_details_query_handler.get_trigger_id(5432) - - self.assertEqual(return_value['status'], "OK") - - def test_get_trigger_id_returns_status_error_when_mom_wrapper_returns_none(self): - self.mom_database_wrapper_mock().get_trigger_id.return_value = None - - return_value = self.project_details_query_handler.get_trigger_id(5432) - - self.assertEqual(return_value['status'], "Error") - - def test_get_trigger_id_returns_error_when_mom_wrapper_returns_none(self): - mom_id = 5432 - - self.mom_database_wrapper_mock().get_trigger_id.return_value = None - - return_value = self.project_details_query_handler.get_trigger_id(mom_id) - - self.assertEqual(return_value['errors'][0], "No trigger_id for mom_id: " + str(mom_id)) - - def test_get_project_details_returns_author_email(self): - author_email = "author@email.com" - self.mom_database_wrapper_mock().get_project_details.return_value = \ - {"author_email": author_email, "pi_email": "pi@email.com"} - - return_value = self.project_details_query_handler.get_project_details(24343) - - self.assertEqual(return_value["author_email"], author_email) + # def test_get_trigger_id_returns_trigger_id_when_mom_wrapper_returns_an_id(self): + # trigger_id = 1234 + # + # self.mom_database_wrapper_mock().get_trigger_id.return_value = trigger_id + # + # return_value = self.project_details_query_handler.get_trigger_id(5432) + # + # self.assertEqual(return_value['trigger_id'], trigger_id) + # + # def test_get_trigger_id_returns_status_ok_when_mom_wrapper_returns_an_id(self): + # trigger_id = 1234 + # + # self.mom_database_wrapper_mock().get_trigger_id.return_value = trigger_id + # + # return_value = self.project_details_query_handler.get_trigger_id(5432) + # + # self.assertEqual(return_value['status'], "OK") + # + # def test_get_trigger_id_returns_status_error_when_mom_wrapper_returns_none(self): + # self.mom_database_wrapper_mock().get_trigger_id.return_value = None + # + # return_value = self.project_details_query_handler.get_trigger_id(5432) + # + # self.assertEqual(return_value['status'], "Error") + # + # def test_get_trigger_id_returns_error_when_mom_wrapper_returns_none(self): + # mom_id = 5432 + # + # self.mom_database_wrapper_mock().get_trigger_id.return_value = None + # + # return_value = self.project_details_query_handler.get_trigger_id(mom_id) + # + # self.assertEqual(return_value['errors'][0], "No trigger_id for mom_id: " + str(mom_id)) + # + # def test_get_project_details_returns_author_email(self): + # author_email = "author@email.com" + # self.mom_database_wrapper_mock().get_project_details.return_value = \ + # {"author_email": author_email, "pi_email": "pi@email.com"} + # + # return_value = self.project_details_query_handler.get_project_details(24343) + # + # self.assertEqual(return_value["author_email"], author_email) def test_get_project_details_returns_pi_email(self): pi_email = "pi@email.com" @@ -570,22 +571,22 @@ class TestProjectDetailsQueryHandler(unittest.TestCase): self.assertEqual(return_value["pi_email"], pi_email) - def test_get_time_restrictions_returns_what_the_mom_wrapper_returns(self): - min_start_time = "2017-01-01" - max_end_time = "2017-01-02" + def test_get_trigger_time_restrictions_returns_what_the_mom_wrapper_returns(self): + min_start_time = "2017-01-01T12:00:00" + max_end_time = "2017-01-02T01:00:03.0000" min_duration = 300 max_duration = 600 - self.mom_database_wrapper_mock().get_time_restrictions.return_value = \ + self.mom_database_wrapper_mock().get_trigger_time_restrictions.return_value = \ {"minStartTime": min_start_time, "maxEndTime": max_end_time, "minDuration": min_duration, "maxDuration": max_duration} - result = self.project_details_query_handler.get_time_restrictions(1234) + result = self.project_details_query_handler.get_trigger_time_restrictions(1234) - self.assertEqual(result["minStartTime"], min_start_time) - self.assertEqual(result["maxEndTime"], max_end_time) - self.assertEqual(result["minDuration"], min_duration) - self.assertEqual(result["maxDuration"], max_duration) + self.assertEqual(result["minStartTime"], datetime(year=2017, month=1, day=1, hour=12)) + self.assertEqual(result["maxEndTime"], datetime(year=2017, month=1, day=2, hour=1, minute=3)) + self.assertEqual(result["minDuration"], timedelta(seconds=300)) + self.assertEqual(result["maxDuration"], timedelta(seconds=600)) def test_get_station_selection_returns_what_the_mom_wrapper_returns(self): resource_group = "SuperTerp" @@ -1177,48 +1178,48 @@ class TestMomQueryRPC(unittest.TestCase): self.assertEqual(len(result['trigger_spec']), len(trigger_specification)) - @mock.patch('lofar.messaging.messagebus.qpid.messaging') - def test_get_trigger_id_logs_before_query(self, qpid_mock): - self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id - - mom_id = 6789 - - qpid_mock.Message = QpidMessage - qpid_mock.Connection().session().senders = [self.sender_mock] - qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock - - self.momrpc.get_trigger_id(mom_id) - - self.logger_mock.info.assert_any_call("Requesting GetTriggerId for mom_id: %s", mom_id) - - @mock.patch('lofar.messaging.messagebus.qpid.messaging') - def test_get_trigger_id_logs_after_query(self, qpid_mock): - self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id - - mom_id = 6789 - - qpid_mock.Message = QpidMessage - qpid_mock.Connection().session().senders = [self.sender_mock] - qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock - - result = self.momrpc.get_trigger_id(mom_id) - - self.logger_mock.info.assert_any_call("Received trigger_id: %s", result) - - @mock.patch('lofar.messaging.messagebus.qpid.messaging') - def test_get_trigger_id_query(self, qpid_mock): - self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id - - mom_id = 6789 - - qpid_mock.Message = QpidMessage - qpid_mock.Connection().session().senders = [self.sender_mock] - qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock - - result = self.momrpc.get_trigger_id(mom_id) - - self.assertEqual(result["trigger_id"], self.trigger_id) - self.assertEqual(result["status"], "OK") + # @mock.patch('lofar.messaging.messagebus.qpid.messaging') + # def test_get_trigger_id_logs_before_query(self, qpid_mock): + # self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id + # + # mom_id = 6789 + # + # qpid_mock.Message = QpidMessage + # qpid_mock.Connection().session().senders = [self.sender_mock] + # qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + # + # self.momrpc.get_trigger_id(mom_id) + # + # self.logger_mock.info.assert_any_call("Requesting GetTriggerId for mom_id: %s", mom_id) + # + # @mock.patch('lofar.messaging.messagebus.qpid.messaging') + # def test_get_trigger_id_logs_after_query(self, qpid_mock): + # self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id + # + # mom_id = 6789 + # + # qpid_mock.Message = QpidMessage + # qpid_mock.Connection().session().senders = [self.sender_mock] + # qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + # + # result = self.momrpc.get_trigger_id(mom_id) + # + # self.logger_mock.info.assert_any_call("Received trigger_id: %s", result) + # + # @mock.patch('lofar.messaging.messagebus.qpid.messaging') + # def test_get_trigger_id_query(self, qpid_mock): + # self.receiver_mock.fetch.return_value = self.qpid_message_get_trigger_id + # + # mom_id = 6789 + # + # qpid_mock.Message = QpidMessage + # qpid_mock.Connection().session().senders = [self.sender_mock] + # qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + # + # result = self.momrpc.get_trigger_id(mom_id) + # + # self.assertEqual(result["trigger_id"], self.trigger_id) + # self.assertEqual(result["status"], "OK") @mock.patch('lofar.messaging.messagebus.qpid.messaging') def test_get_project_details_logs_before_query(self, qpid_mock): @@ -1729,13 +1730,10 @@ class TestMoMDatabaseWrapper(unittest.TestCase): self.mom_database_wrapper.get_station_selection(1234) def test_get_time_restrictions_returns_misc_field_info_from_query_result(self): - min_start_time = u"2017-01-01" - max_end_time = u"2017-01-04" - mom_starttime = u"2017-01-02" - mom_endtime = u"2017-01-03" + min_start_time = u"2017-01-01T12:00:00" + max_end_time = u"2017-01-04T01:00:00" min_duration = 300 max_duration = 600 - mom_duration = 400 timewindow = {u"minStartTime": min_start_time, u"maxEndTime": max_end_time, @@ -1744,38 +1742,37 @@ class TestMoMDatabaseWrapper(unittest.TestCase): expected_result = timewindow details_result = [{u"mom2id": self.mom_id, u"mom2objecttype": self.job_type, - u"misc": json.dumps({u"timeWindow": timewindow}), u"duration": mom_duration, - u"starttime": mom_starttime, u"endtime": mom_endtime}] + u"misc": json.dumps({u"timeWindow": timewindow})}] self.mysql_mock.connect().cursor().fetchall.return_value = details_result - result = self.mom_database_wrapper.get_time_restrictions(self.mom_id) + result = self.mom_database_wrapper.get_trigger_time_restrictions(self.mom_id) self.assertEqual(result, expected_result) - def test_get_time_restrictions_returns_mom_info_if_misc_empty_in_query_result(self): - time_format = "%Y-%m-%dT%H:%M:%S" - mom_starttime = datetime.strptime("2017-01-02T00:00:00", time_format) - mom_endtime = datetime.strptime("2017-01-03T00:00:00", time_format) - mom_duration = 400 - - expected_result = {u"minStartTime": mom_starttime.strftime(time_format), - u"maxEndTime": mom_endtime.strftime(time_format), - u"minDuration": str(mom_duration), - u"maxDuration": str(mom_duration)} - - details_result = [{u"mom2id": self.mom_id, u"mom2objecttype": self.job_type, - u"misc": None, u"duration": mom_duration, - u"starttime": mom_starttime, u"endtime": mom_endtime}] - self.mysql_mock.connect().cursor().fetchall.return_value = details_result - - result = self.mom_database_wrapper.get_time_restrictions(self.mom_id) - self.assertEqual(result, expected_result) + # def test_get_time_restrictions_returns_mom_info_if_misc_empty_in_query_result(self): + # time_format = "%Y-%m-%dT%H:%M:%S" + # mom_starttime = datetime.strptime("2017-01-02T00:00:00", time_format) + # mom_endtime = datetime.strptime("2017-01-03T00:00:00", time_format) + # mom_duration = 400 + # + # expected_result = {u"minStartTime": mom_starttime.strftime(time_format), + # u"maxEndTime": mom_endtime.strftime(time_format), + # u"minDuration": str(mom_duration), + # u"maxDuration": str(mom_duration)} + # + # details_result = [{u"mom2id": self.mom_id, u"mom2objecttype": self.job_type, + # u"misc": None, u"duration": mom_duration, + # u"starttime": mom_starttime, u"endtime": mom_endtime}] + # self.mysql_mock.connect().cursor().fetchall.return_value = details_result + # + # result = self.mom_database_wrapper.get_trigger_time_restrictions(self.mom_id) + # self.assertEqual(result, expected_result) def test_get_time_restrictions_throws_ValueError_on_empty_query_result(self): self.mysql_mock.connect().cursor().fetchall.return_value = [] with self.assertRaises(ValueError): - self.mom_database_wrapper.get_time_restrictions(1234) + self.mom_database_wrapper.get_trigger_time_restrictions(1234) def test_get_time_restrictions_throws_ValueError_if_no_time_restrictions_in_query_result(self): details_result = [{u"mom2id": self.mom_id, u"mom2objecttype": self.job_type, @@ -1784,7 +1781,7 @@ class TestMoMDatabaseWrapper(unittest.TestCase): self.mysql_mock.connect().cursor().fetchall.return_value = details_result with self.assertRaises(ValueError): - self.mom_database_wrapper.get_time_restrictions(1234) + self.mom_database_wrapper.get_trigger_time_restrictions(1234) @unittest.skip("Skipping integration test") @@ -2201,7 +2198,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): def test_get_time_restrictions_throws_ValueError_on_empty_database(self): with self.assertRaises(ValueError): - self.mom_database_wrapper.get_time_restrictions(1234) + self.mom_database_wrapper.get_trigger_time_restrictions(1234) def test_get_time_restrictions_throws_ValueError_if_no_time_restrictions_in_database(self): @@ -2218,7 +2215,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): "16, NULL)") with self.assertRaises(ValueError): - self.mom_database_wrapper.get_time_restrictions(2) + self.mom_database_wrapper.get_trigger_time_restrictions(2) def test_get_time_restrictions_returns_correct_time_restrictions(self): min_start_time = "2017-01-01" @@ -2240,7 +2237,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): "\"maxDuration\": %s}}')" % (min_duration, min_start_time, max_end_time, min_duration, max_duration)) - result = self.mom_database_wrapper.get_time_restrictions(2) + result = self.mom_database_wrapper.get_trigger_time_restrictions(2) self.assertEqual(result["minStartTime"], min_start_time) self.assertEqual(result["maxEndTime"], max_end_time) @@ -2262,7 +2259,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): "NULL, %s, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, NULL, " "16, NULL)" % duration) - result = self.mom_database_wrapper.get_time_restrictions(2) + result = self.mom_database_wrapper.get_trigger_time_restrictions(2) self.assertEqual(result["minDuration"], duration) self.assertEqual(result["maxDuration"], duration) diff --git a/SAS/ResourceAssignment/Common/lib/specification.py b/SAS/ResourceAssignment/Common/lib/specification.py index ad540f58be9efe91654eb51995ea9658f8acf897..4d454a675abf73a748d0b647210b618f60119702 100644 --- a/SAS/ResourceAssignment/Common/lib/specification.py +++ b/SAS/ResourceAssignment/Common/lib/specification.py @@ -40,6 +40,7 @@ OUTPUT_PREFIX = "LOFAR.ObsSW." # refactoring. #TODO There are lot's of hardcoded OTDB key strings and values in here. #TODO Maybe all/most the static methods should log what they're doing and no longer be static? +#TODO We can do more direct updating of OTDB and MoM from here, especially the MoM sytem spec. class Specification: def __init__(self, logger, otdbrpc, momquery, radbrpc): @@ -65,16 +66,17 @@ class Specification: self.internal_dict = {} # TODO: Needs a better name? self.predecessors = [] # list of specification instances self.successor_ids = [] # list of successor Identifiers - self.otdb_id = None # Task Id in OTDB - self.mom_id = None # Task Id in MoM - self.radb_id = None # Task Id in RADB - self.type = None # Task type in RADB - self.subtype = None # Task type in RADB - self.status = None # Task status, as used in OTDB/MoM. + self.otdb_id = None # Task Id in OTDB + self.mom_id = None # Task Id in MoM + self.radb_id = None # Task Id in RADB + self.trigger_id = None # Id of trigger is this was specified in a trigger + self.type = None # Task type in RADB + self.subtype = None # Task type in RADB + self.status = None # Task status, as used in OTDB/MoM. #Inputs for the scheduler self.min_starttime = None - self.max_starttime = None #TODO do we need this one? + #self.max_starttime = None # We return this from calculate_dwell_values self.max_endtime = None self.min_duration = timedelta(seconds = 0) self.max_duration = timedelta(seconds = 0) @@ -96,11 +98,12 @@ class Specification: result["otdb_id"] = self.otdb_id result["mom_id"] = self.mom_id result["task_id"] = self.radb_id + result["trigger_id"] = self.trigger_id result["status"] = self.status result["task_type"] = self.type result["task_subtype"] = self.subtype result["min_starttime"] = str(self.min_starttime) - result["max_starttime"] = str(self.max_starttime) + #result["max_starttime"] = str(self.max_starttime) result["max_endtime"] = str(self.max_endtime) result["min_duration"] = str(self.min_duration) result["max_duration"] = str(self.max_duration) @@ -109,7 +112,7 @@ class Specification: result["duration"] = self.duration.total_seconds() result["cluster"] = self.cluster result["specification"] = self.internal_dict - result["specification"]["Observation.startTime"] = str(self.starttime) #TODO set/update these somewhere else + result["specification"]["Observation.startTime"] = str(self.starttime) #TODO set/update these somewhere else? result["specification"]["Observation.stopTime"] = str(self.endtime) result["predecessors"] = [] for p in self.predecessors: @@ -125,11 +128,12 @@ class Specification: self.otdb_id = input_dict["otdb_id"] self.mom_id = input_dict["mom_id"] self.radb_id = input_dict["task_id"] + self.trigger_id = input_dict["trigger_id"] self.status = input_dict["status"] self.type = input_dict["task_type"] self.subtype = input_dict["task_subtype"] self.min_starttime = parseDatetime(input_dict["min_starttime"]) - self.max_starttime = parseDatetime(input_dict["max_starttime"]) + #self.max_starttime = parseDatetime(input_dict["max_starttime"]) self.endtime = parseDatetime(input_dict["endtime"]) self.duration = timedelta(seconds = input_dict["duration"]) self.min_duration = timedelta(seconds = input_dict["min_duration"]) @@ -159,19 +163,30 @@ class Specification: """"Read specification values from the MoM database, mostly the misc field time restrictions Tries to set min_starttime, max_endtime, min_duration, max_duration, if the Specification has a mom_id example input: - timeWindow": {"minStartTime": "2017-05-23T15:21:44", "maxEndTime": "2017-11-23T15:21:44", "minDuration": 1600, "maxDuration": 7200} - """ - if self.mom_id: #TODO cleanup and make matching with momquery - time_restrictions = self.momquery.get_time_restrictions(self.mom_id) - self.logger.info("Received time restrictions from MoM specs: %s", time_restrictions) - if time_restrictions: - self.min_starttime = parseDatetime(time_restrictions["minStartTime"].replace('T', ' ')) #This might not work for XML - if "minDuration" in time_restrictions: - self.duration = timedelta(seconds=int(time_restrictions["minDuration"])) - if "maxEndTime" in time_restrictions: - max_end_time = parseDatetime(time_restrictions["maxEndTime"].replace('T', ' '))#This might not work for XML - self.end_time = max_end_time - self.max_starttime = self.end_time - self.duration # TODO huh? This doesn't seem to match DwellScheduler specs + """ + if self.mom_id: + # We might need more than the misc field in the future. + # Right now we assume start/end times from OTDB always have priority for example. + try: + misc = self.momquery.get_trigger_time_restrictions(self.mom_id) + self.logger.info("Received misc field from MoM: %s", misc) + if misc: + if "minStartTime" in misc: + self.min_starttime = misc["minStartTime"] + if "minDuration" in misc: + self.max_duration = misc["maxDuration"] + if "maxDuration" in misc: + self.min_duration = misc["minDuration"] + if "maxEndTime" in misc: + self.max_endtime = misc["maxEndTime"] + if "trigger_id" in misc: + self.trigger_id = misc["trigger_id"] + self.logger.info('Found a task mom_id=%s with a trigger_id=%s', self.mom_id, self.trigger_id) + except Exception as e: + self.logger.exception("read_from_mom: " + str(e), exc_info=True) + self.set_status("error") + else: + self.logger.info("This task does not have a mom_id.") # ========================= parset/OTDB related methods ======================================================================= @@ -776,6 +791,7 @@ class Specification: # =========================================== RADB related methods ======================================================= + #TODO we might need read_from_radb_with_predecessors in the future def read_from_radb(self, radb_id): """The returned task is from "SELECT * from resource_allocation.task_view tv" with start/endtime converted to a datetime. This only reads a few values from the RADB, not the whole specification. @@ -783,10 +799,10 @@ class Specification: :return: task dict (radb_id, mom_id, otdb_id, status_id, type_id, specification_id, status, type, starttime, endtime, duration, cluster) or None """ - #TODO maybe this should read the spec as well? Mostly here for backward compatibility, not sure if still needed + #TODO maybe this should read the spec as well? Can especially start/end times be updated outside of here? task = self.radbrpc.getTask(radb_id) # if radb_id is not None else None if task: #TODO what is the exact structure of task see schedulerchecker 47 - self.radb_id = task["id"] + self.radb_id = task["id"] # Should be the same as radb_id, but self.radb_id might not yet be set self.mom_id = task["mom_id"] self.otdb_id = task["otdb_id"] self.status = task["status"] @@ -815,8 +831,8 @@ class Specification: task = self.read_from_radb(self.radb_id) - self._link_predecessors_to_task_in_radb(task) - self._link_successors_to_task_in_radb(task) + self._link_predecessors_to_task_in_radb() + self._link_successors_to_task_in_radb() self.logger.info('Successfully inserted main task and its predecessors and successors into RADB: task=%s', task) @@ -879,49 +895,47 @@ class Specification: self.logger.info('inserted specification (id=%s) and task (id=%s)' % (specification_id, task_id)) return task_id - def _link_predecessors_to_task_in_radb(self, task): + def _link_predecessors_to_task_in_radb(self): """ Links a task to its predecessors in RADB :param task: the task at hand """ #TODO how to keep the predecessors in MoM and in OTDB in sync here? Does it matter? - #FIXME Not sure if this actually works, task['predecessor_ids'] might no longer be set - #TODO refactor for this to use Specification instead of task? - mom_id = task['mom_id'] - - predecessor_ids = self.momquery.getPredecessorIds(mom_id) - if str(mom_id) not in predecessor_ids or not predecessor_ids[str(mom_id)]: - self.logger.info('no predecessors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id) + predecessor_ids = self.momquery.getPredecessorIds(self.mom_id) + if str(self.mom_id) not in predecessor_ids or not predecessor_ids[str(self.mom_id)]: + self.logger.info('no predecessors for otdb_id=%s mom_id=%s', self.otdb_id, self.mom_id) return - predecessor_mom_ids = predecessor_ids[str(mom_id)] + predecessor_mom_ids = predecessor_ids[str(self.mom_id)] - self.logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], - task['otdb_id']) + self.logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, self.mom_id, self.otdb_id) for predecessor_mom_id in predecessor_mom_ids: # check if the predecessor needs to be linked to this task predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id) if predecessor_task: - if predecessor_task['id'] not in task['predecessor_ids']: + predecessor_keys = [p.radb_id for p in self.predecessors] + if predecessor_task['id'] not in predecessor_keys: self.logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s ' - 'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], - task['otdb_id']) - self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id']) + 'otdb_id=%s', predecessor_task['mom_id'], predecessor_task['otdb_id'], self.mom_id, + self.otdb_id) + spec = Specification(self.logger, self.otdbrpc, self.momquery, self.radbrpc) + spec.read_from_radb(predecessor_task['id']) + self.predecessors.append(spec) # TODO this needs a try/except somewhere? + self.radbrpc.insertTaskPredecessor(self.radb_id, spec.radb_id) else: # Occurs when setting a pipeline to prescheduled while a predecessor has e.g. never been beyond # approved, which is in principle valid. The link in the radb will be made later via processSuccessors() # below. Alternatively, a predecessor could have been deleted. self.logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', - predecessor_mom_id, task['otdb_id']) + predecessor_mom_id, self.otdb_id) - def _link_successors_to_task_in_radb(self, task): + def _link_successors_to_task_in_radb(self): """ Links a task to its successors in RADB """ - #TODO refactor for this to use Specification instead of task? - #FIXME Not usre if this works, as task['successor_ids'] might not be set + #FIXME Not sure if this works, as self.successor_ids might not be set outside of here successor_ids = self.momquery.getSuccessorIds(self.mom_id) if str(self.mom_id) not in successor_ids or not successor_ids[str(self.mom_id)]: @@ -936,14 +950,13 @@ class Specification: # check if the successor needs to be linked to this task successor_task = self.radbrpc.getTask(mom_id=successor_mom_id) if successor_task: - if successor_task['id'] not in task['successor_ids']: + if successor_task['id'] not in self.successor_ids: self.logger.info( 'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s' - ' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], - task['otdb_id'] + ' otdb_id=%s', successor_task['mom_id'], successor_task['otdb_id'], self.mom_id, self.otdb_id ) - - self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id']) + self.successor_ids.append(successor_task['id']) + self.radbrpc.insertTaskPredecessor(successor_task['id'], self.radb_id) movePipelineAfterItsPredecessors(successor_task, self.radbrpc) else: @@ -951,4 +964,4 @@ class Specification: # approved, which is quite normal. The link in the radb will be made later via processPredecessors() # above. Alternatively, a successor could have been deleted. self.logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', - successor_mom_id, task['otdb_id']) + successor_mom_id, self.otdb_id) diff --git a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py index bd303c001f5317b44caf79bbedcfa4701434ba4d..5c6df28339e38f66d2d7065a597e9e5e4616f1a4 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py @@ -158,6 +158,7 @@ class TaskPrescheduler(OTDBBusListener): spec = Specification(logger, self.otdbrpc, self.momquery, self.radbrpc) spec.status = status spec.read_from_otdb(treeId) + spec.read_from_mom() spec.update_start_end_times() spec.insert_into_radb() # if spec.validate()? @@ -165,20 +166,15 @@ class TaskPrescheduler(OTDBBusListener): return if not spec.mom_id: return - response = self.momquery.get_trigger_id(spec.mom_id) - if response['status'] == 'OK': - logger.info('Found a task mom_id=%s with a trigger_id=%s', spec.mom_id, response['trigger_id']) - - #TODO, check for stations and other resources, start/endtime, target position, then update specification - - logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, response['trigger_id']) + if spec.trigger_id: + logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, spec.trigger_id) otdb_info = {} if spec.isObservation(): cobalt_values = calculateCobaltSettings(spec) otdb_info.update(cobaltOTDBsettings(cobalt_values)) self.setOTDBinfo(spec.otdb_id, otdb_info, 'prescheduled') else: - logger.info('Did not find a trigger for task mom_id=%s, because %s', spec.mom_id, response['errors']) + logger.info('Did not find a trigger for task mom_id=%s, because %s', spec.mom_id) def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): """This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator"""