diff --git a/SAS/TMSS/src/tmss/exceptions.py b/SAS/TMSS/src/tmss/exceptions.py index a320dbd527a5a58a0d7274836beb66f9f5387c1c..018622ec7aa35dc6af7727fe8852013779baaf45 100644 --- a/SAS/TMSS/src/tmss/exceptions.py +++ b/SAS/TMSS/src/tmss/exceptions.py @@ -14,6 +14,9 @@ class BlueprintCreationException(ConversionException): class SubtaskCreationException(ConversionException): pass +class SubtaskException(TMSSException): + pass + class SchedulingException(TMSSException): pass diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py index a17c633eddc51a91df401c756e1f00bdfc200942..51f1e82635dd216f978e61539536cb9d98e92f92 100644 --- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -9,7 +9,7 @@ from lofar.common import isProductionEnvironment from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema from lofar.common.lcu_utils import get_current_stations -from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException +from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException, SubtaskException from datetime import datetime, timedelta from lofar.common.datetimeutils import parseDatetime @@ -583,22 +583,23 @@ def _assign_resources(subtask: Subtask): pass assigned = False with RARPC.create() as rarpc: - # provide an maximum to iterate to avoid infinite loop + # provide an maximum to iterate to avoid infinite loop? while not assigned: try: assigned = rarpc.do_assignment(ra_spec) except ScheduleException as e: - print("Conflicts in schedule, lets check the stations in conflict") + logger.info("Conflicts in assignment detected, lets check the stations in conflict and re-assign if possible") except Exception as e: print("This is another Exception which does not lead to retry assignment") raise e - # If not not assigned + # Try to re-assign if not assigned yet if not assigned: lst_stations_in_conflict = get_stations_in_conflict(subtask.id) lst_stations = determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict) ra_spec = update_specification(ra_spec, lst_stations) - # At the end still not possible to assign, give Exception and show which stations are in conflict + # At the end still not possible to assign, give Exception normally an Exception should already have been + # raised during the re-assignment if not assigned: raise SubtaskSchedulingException("Cannot schedule subtask id=%d because the required resources are not (fully) available." % (subtask.pk, )) @@ -643,21 +644,54 @@ def determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict): """ Determine which stations can be assigned when conflict of stations are occurred Station in conflict should be removed. - Use the max_missing from the task specifications and the conflicted station list to create a station list - which should be possible to assign. If that is not possible then raise an SubtaskSchedulingException - with correct context - + Use the max_nr_missing from the task specifications and the conflicted station list to create a station list + which should be possible to assign. If the number of max missing in a station group is larger than the station + to be skipped, then new assignment is not possible so raise an SubtaskSchedulingException with context :param subtask: :param lst_stations_in_conflict: :return: lst_stations: List of station which can be assigned """ - # get the station list from specification + # Get the station list from specification and remove the conflict stations lst_specified_stations = subtask.specifications_doc["stations"]["station_list"] lst_stations = list(set(lst_specified_stations) - set(lst_stations_in_conflict)) - print("determine_stations_which_can_be_assigned %s" % lst_stations) + logger.info("Determine stations which can be assigned %s" % lst_stations) + + # Check whether the removing of the conflict station the requirements of max_nr_missing per station_group is + # still fulfilled. If that is OK then we are done otherwise we will raise an Exception + stations_groups = get_station_groups(subtask) + for sg in stations_groups: + nbr_missing = len(set(sg["stations"]) & set(lst_stations_in_conflict)) + if nbr_missing > sg["max_nr_missing"]: + raise SubtaskSchedulingException("There are more stations in conflict than the specification is given " + "(%d is larger than %d). The stations that are in conflict are '%s'." + "Please check station of subtask %d " % + (nbr_missing, sg["max_nr_missing"], lst_stations_in_conflict, subtask.pk)) return lst_stations +def get_station_groups(subtask): + """ + Retrieve the stations_group specifications of the given subtask + Need to retrieve it from (related) Target Observation Task + Note list can be empty (some testcase) which result in no checking max_nr_missing + :param subtask: + :return: station_groups which is a list of dict. { station_list, max_nr_missing } + """ + station_groups = [] + if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower(): + # Calibrator requires related Target Task Observation for some specifications + target_task_blueprint = get_related_target_observation_task_blueprint(subtask.task_blueprint) + if target_task_blueprint is None: + raise SubtaskException("Cannot retrieve related target observation of task_blueprint %d (subtask %d)" % + (subtask.task_blueprint.id, subtask.id)) + if "station_groups" in target_task_blueprint.specifications_doc.keys(): + station_groups = target_task_blueprint.specifications_doc["station_groups"] + else: + if "station_groups" in subtask.task_blueprint.specifications_doc.keys(): + station_groups = subtask.task_blueprint.specifications_doc["station_groups"] + return station_groups + + def update_specification(ra_spec, lst_stations): """ Update the RA Specification dictionary with the correct list of stations @@ -668,7 +702,6 @@ def update_specification(ra_spec, lst_stations): if len(lst_stations) == 0: raise SubtaskSchedulingException("Cannot re-assign resources after conflict for subtask id=%d " "because there are no stations left to assign. " % ra_spec["tmss_id"]) - updated_ra_spec = ra_spec updated_ra_spec["specification"]["Observation.VirtualInstrument.stationList"] = "[%s]" % ','.join(s for s in lst_stations) # ?? should the station_requirements also be updated or just leave that empty '[]' diff --git a/SAS/TMSS/test/CMakeLists.txt b/SAS/TMSS/test/CMakeLists.txt index 716469c7ca9294350badc60448fc92870eb6be8e..88afc5af2ea0b9a03c4d51b489346202877781eb 100644 --- a/SAS/TMSS/test/CMakeLists.txt +++ b/SAS/TMSS/test/CMakeLists.txt @@ -37,6 +37,7 @@ if(BUILD_TESTING) # To get ctest running file(COPY testdata DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) + set_tests_properties(t_scheduling TIMEOUT 180) set_tests_properties(t_tmssapp_scheduling_REST_API PROPERTIES TIMEOUT 300) set_tests_properties(t_tmssapp_specification_REST_API PROPERTIES TIMEOUT 360) endif() diff --git a/SAS/TMSS/test/t_scheduling.py b/SAS/TMSS/test/t_scheduling.py index 4cd7da8de44b3050ec60e1b3bbde7890ac0651d8..b4b649a4b362e5498f4935df7cd5acde13a49f19 100755 --- a/SAS/TMSS/test/t_scheduling.py +++ b/SAS/TMSS/test/t_scheduling.py @@ -79,6 +79,27 @@ def create_subtask_object_for_testing(subtask_type_value, subtask_state_value): return models.Subtask.objects.create(**subtask_data) +def create_reserved_stations_for_testing(station_list): + """ + Helper function to create stations in reservation, in other words assigned in Resource Assigner + :param station_list: List of station names to assign + """ + with RARPC.create() as rarpc: + ra_spec = {'task_type': 'reservation', + 'task_subtype': 'maintenance', + 'status': 'prescheduled', + 'starttime': datetime.utcnow() - timedelta(hours=1), + 'endtime': datetime.utcnow() + timedelta(hours=1), + 'cluster': None, + 'specification': {}} + inner_spec = {'Observation.VirtualInstrument.stationList': station_list, + 'Observation.startTime': '2020-01-08 06:30:00', + 'Observation.endTime': '2021-07-08 06:30:00'} + ra_spec['specification'] = inner_spec + assigned = rarpc.do_assignment(ra_spec) + return assigned + + class SchedulingTest(unittest.TestCase): def setUp(self): # clean all specs/tasks/claims in RADB (cascading delete) @@ -107,28 +128,51 @@ class SchedulingTest(unittest.TestCase): self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', ra_test_env.radb.getTask(tmss_id=subtask_id)['status']) - def test_schedule_observation_subtask_with_blocking_reservations_failed(self): + def test_schedule_observation_subtask_with_one_blocking_reservation_failed(self): + """ + Set (Resource Assigner) station CS001 to reserved + Schedule subtask with station CS001 + Check if schedule of the subtask fail + """ + self.assertTrue(create_reserved_stations_for_testing(['CS001'])) - # create a reservation on station CS001 - with RARPC.create() as rarpc: - ra_spec = { 'task_type': 'reservation', - 'task_subtype': 'maintenance', - 'status': 'prescheduled', - 'starttime': datetime.utcnow()-timedelta(hours=1), - 'endtime': datetime.utcnow() + timedelta(hours=1), - 'cluster': None, - 'specification': {} } - inner_spec = { 'Observation.VirtualInstrument.stationList': ['CS001'], - 'Observation.startTime': '2020-01-08 06:30:00', - 'Observation.endTime': '2021-07-08 06:30:00' } - ra_spec['specification'] = inner_spec - assigned = rarpc.do_assignment(ra_spec) - self.assertTrue(assigned) + with tmss_test_env.create_tmss_client() as client: + subtask_template = client.get_subtask_template("observation control") + spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['stations']['digital_pointings'][0]['subbands'] = [0] + cluster_url = client.get_path_as_json_object('/cluster/1')['url'] + + subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], + specifications_doc=spec, + cluster_url=cluster_url, + task_blueprint_url=test_data_creator.post_data_and_get_url(test_data_creator.TaskBlueprint(), '/task_blueprint/')) + subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') + subtask_id = subtask['id'] + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), '/subtask_output/') + + client.set_subtask_status(subtask_id, 'defined') + + with self.assertRaises(Exception): + client.schedule_subtask(subtask_id) + + subtask = client.get_subtask(subtask_id) + self.assertEqual('error', subtask['state_value']) + self.assertEqual('conflict', ra_test_env.radb.getTask(tmss_id=subtask_id)['status']) + + def test_schedule_observation_subtask_with_blocking_reservations_failed(self): + """ + Set (Resource Assigner) station CS001, CS002, CS401, CS501 to reserved + Schedule subtask with stations CS001, CS002, CS401 + Check if schedule of the subtask fail + """ + self.assertTrue(create_reserved_stations_for_testing(['CS001','CS002','CS501','CS401' ])) with tmss_test_env.create_tmss_client() as client: subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) spec['stations']['digital_pointings'][0]['subbands'] = [0] + spec['stations']['station_list'] = ['CS001', 'CS002', 'CS401'] + cluster_url = client.get_path_as_json_object('/cluster/1')['url'] subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], @@ -149,21 +193,12 @@ class SchedulingTest(unittest.TestCase): self.assertEqual('conflict', ra_test_env.radb.getTask(tmss_id=subtask_id)['status']) def test_schedule_observation_subtask_with_blocking_reservation_ok(self): - # create a reservation on station CS001 - with RARPC.create() as rarpc: - ra_spec = {'task_type': 'reservation', - 'task_subtype': 'maintenance', - 'status': 'prescheduled', - 'starttime': datetime.utcnow() - timedelta(hours=1), - 'endtime': datetime.utcnow() + timedelta(hours=1), - 'cluster': None, - 'specification': {}} - inner_spec = {'Observation.VirtualInstrument.stationList': ['CS001'], - 'Observation.startTime': '2020-01-08 06:30:00', - 'Observation.endTime': '2021-07-08 06:30:00'} - ra_spec['specification'] = inner_spec - assigned = rarpc.do_assignment(ra_spec) - self.assertTrue(assigned) + """ + Set (Resource Assigner) station CS001 to reserved + Schedule subtask with station CS001, CS002, CS003 + Check if schedule of the subtasks do not fail (it can schedule with station CS002 and CS003) + """ + self.assertTrue(create_reserved_stations_for_testing(['CS001','CS003'])) with tmss_test_env.create_tmss_client() as client: subtask_template = client.get_subtask_template("observation control") @@ -186,8 +221,6 @@ class SchedulingTest(unittest.TestCase): subtask = client.schedule_subtask(subtask_id) self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', ra_test_env.radb.getTask(tmss_id=subtask_id)['status']) - # Check which stations are used should be CS002 and CS003 - def test_schedule_pipeline_subtask_with_enough_resources_available(self): with tmss_test_env.create_tmss_client() as client: @@ -491,6 +524,31 @@ class TestWithUC1Specifications(unittest.TestCase): cls.scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all() cls.scheduling_unit_blueprint = cls.scheduling_unit_blueprints[0] cls.task_blueprints = cls.scheduling_unit_blueprint.task_blueprints.all() + # SubtaskId of the first observation subtask + observation_tbp = list(tb for tb in list(cls.task_blueprints) if tb.specifications_template.type.value == TaskType.Choices.OBSERVATION.value) + observation_tbp.sort(key=lambda tb: tb.relative_start_time) + cls.subtask_id_of_first_observation = list(st for st in observation_tbp[0].subtasks.all() + if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)[0].id + + def setUp(self): + # clean all specs/tasks/claims in RADB (cascading delete) + for spec in ra_test_env.radb.getSpecifications(): + ra_test_env.radb.deleteSpecification(spec['id']) + # Set subtask back to 'defined' (and no start/stoptime) + for tb in self.task_blueprints: + for subtask in tb.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value="defined") + subtask.stop_time = None + subtask.start_time = None + subtask.save() + + def _schedule_subtask_with_failure(self, station_reserved): + with tmss_test_env.create_tmss_client() as client: + with self.assertRaises(Exception) as context: + client.schedule_subtask(self.subtask_id_of_first_observation) + self.assertTrue("There are more stations in conflict than the specification is given" in str(context.exception).lower()) + for station in station_reserved: + self.assertTrue(station in str(context.exception).lower()) def test_create_task_blueprints_and_subtasks_from_scheduling_unit_draft(self): """ @@ -562,6 +620,66 @@ class TestWithUC1Specifications(unittest.TestCase): self.assertEqual(timedelta(0), task_blueprint.relative_start_time) self.assertEqual(timedelta(0), task_blueprint.relative_stop_time) + def test_dutch_stations_conflicts_exception(self): + """ + Test conflict of 'Dutch' station which are have a default of max_nr_missing=4, + Assign stations equal to max_nr_missing+1 before schedule it and check if it can NOT be scheduled + Check the context of the Exception + """ + station_reserved = ['CS002', 'CS003', 'CS004', 'CS401', 'CS501'] + self.assertTrue(create_reserved_stations_for_testing(station_reserved)) + self._schedule_subtask_with_failure(station_reserved) + + def test_dutch_stations_conflicts_ok(self): + """ + Test conflict of 'Dutch' station which are have a default of max_nr_missing=4, + Assign stations equal to max_nr_missing before schedule it and check if it can be scheduled + """ + station_reserved = ['CS002', 'CS003', 'CS004', 'CS401'] + self.assertTrue(create_reserved_stations_for_testing(station_reserved)) + with tmss_test_env.create_tmss_client() as client: + client.schedule_subtask(self.subtask_id_of_first_observation) + + def test_international_stations_conflicts_failed(self): + """ + Test conflict of 'International' stations which are have a default of max_nr_missing=2, + Assign stations equal to max_nr_missing+1 before schedule it and check if it can NOT be scheduled + Check the context of the Exception + """ + station_reserved = ['SE607', 'PL610', 'PL612'] + self.assertTrue(create_reserved_stations_for_testing(station_reserved)) + self._schedule_subtask_with_failure(station_reserved) + + def test_international_stations_conflicts_ok(self): + """ + Test conflict of 'International' stations which are have a default of max_nr_missing=2, + Assign stations equal to max_nr_missing before schedule it and check if it can be scheduled + """ + station_reserved = ['SE607', 'PL612'] + self.assertTrue(create_reserved_stations_for_testing(station_reserved)) + with tmss_test_env.create_tmss_client() as client: + client.schedule_subtask(self.subtask_id_of_first_observation) + + def test_international_required_stations_conflicts_failed(self): + """ + Test conflict of 'International Required' stations which are have a default of max_nr_missing=1, + Assign stations equal to max_nr_missing+1 before schedule it and check if it can NOT be scheduled + Check the context of the Exception + """ + station_reserved = ['DE601', 'DE605'] + self.assertTrue(create_reserved_stations_for_testing(station_reserved)) + self._schedule_subtask_with_failure(station_reserved) + + def test_international_required_stations_conflicts_ok(self): + """ + Test conflict of 'International Required' stations which are have a default of max_nr_missing=1, + Assign stations equal to max_nr_missing before schedule it and check if it can be scheduled + """ + station_reserved = ['DE605'] + self.assertTrue(create_reserved_stations_for_testing(station_reserved)) + with tmss_test_env.create_tmss_client() as client: + client.schedule_subtask(self.subtask_id_of_first_observation) + if __name__ == "__main__": os.environ['TZ'] = 'UTC'