diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index a4fba23eab7c4a5be4ccc0cf9485dcc0b36ee672..413c28a1f1bfb9f529d2c335ddec662a0311aeb4 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -996,11 +996,11 @@ def _create_ra_specification(_subtask): def assign_or_unassign_resources(subtask: Subtask): """ + Assign/unassign resources for subtasks. If resources are not available or they do not meet requirements, + a SubtaskSchedulingException is raised. :param subtask: """ - MAX_NBR_ASSIGNMENTS = 10 - if subtask.specifications_template.type.value not in (SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value): raise SubtaskSchedulingException("Cannot assign/unassign resources for subtask id=%d because it is not an observation/pipeline. type=%s" % (subtask.pk, subtask.specifications_template.type.value)) @@ -1016,29 +1016,38 @@ def assign_or_unassign_resources(subtask: Subtask): except: pass - #TODO: rewrite the code below. Goal is to take out stations which cannot be used. Accept if sufficient stations available, else raise. Only do this for observation subtasks. + # Reason about stations only for observations with a station list + if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value and \ + "stations" in subtask.specifications_doc and "station_list" in subtask.specifications_doc["stations"]: + _do_assignment_for_observations_with_required_station_check(subtask, ra_spec) + else: + with RARPC.create() as rarpc: + try: + rarpc.do_assignment(ra_spec) + except ScheduleException as e: + raise SubtaskSchedulingException("Cannot schedule/unschedule subtask id=%d. The required resources are not (fully) available." % subtask.pk) + + +def _do_assignment_for_observations_with_required_station_check(subtask: Subtask, ra_spec) -> bool: + """ + Try to detect conflicts and re-assign if possible. + :param subtask: + :param ra_spec: + """ assigned = False - cnt_do_assignments = 1 with RARPC.create() as rarpc: - while not assigned and cnt_do_assignments < MAX_NBR_ASSIGNMENTS: + # Try to re-assign till it succeeds. If the requirements are not met, an exception will be raised. + while not assigned: try: - cnt_do_assignments += 1 assigned = rarpc.do_assignment(ra_spec) except ScheduleException as e: - logger.info("Conflicts in assignment detected, lets check the stations in conflict and re-assign if possible") - # Try to re-assign if not assigned yet + logger.exception(e) if not assigned: - # only reason about stations when this is an observation with a station_list - if "stations" in subtask.specifications_doc and "station_list" in subtask.specifications_doc["stations"]: - lst_stations_in_conflict = get_stations_in_conflict(subtask.id) - lst_stations = determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict) - ra_spec = update_specification(ra_spec, lst_stations) - - # At the end still not possible to assign, give Exception. - if not assigned: - raise SubtaskSchedulingException("Cannot schedule/unschedule subtask id=%d within %d number of attempts. " - "The required resources are not (fully) available." % (subtask.pk, cnt_do_assignments)) - + logger.info("Conflicts in assignment detected, checking stations in conflict and re-assign if possible") + lst_stations_in_conflict = get_stations_in_conflict(subtask.id) + lst_stations = determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict) + ra_spec = update_specification(ra_spec, lst_stations) + return assigned def get_stations_in_conflict(subtask_id): """ diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 2c124948c7985d29c9ba32fbff10e6d209be1d5c..8bd29da568e0ee999915ec0cc1022d40a95a3dd5 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -145,14 +145,14 @@ class SchedulingTest(unittest.TestCase): def _create_target_observation_subtask(specification_doc: dict=None) -> dict: '''create a target observation subtask in defined state and return the subtask as json dict. if the given specification_doc is None, then the defaults are used.''' + if specification_doc is None: + specification_doc = {'stations': {'digital_pointings': [{'name': 'target0', 'subbands': [0]}], 'station_list': ['CS001', 'CS002', 'CS003']}} + with tmss_test_env.create_tmss_client() as client: task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="target observation")['url']) task_blueprint_data['specifications_doc']['SAPs'][0]['name'] = specification_doc['stations']['digital_pointings'][0]['name'] task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(task_blueprint_data, '/task_blueprint/') - if specification_doc is None: - specification_doc = {} - subtask_template = client.get_subtask_template("observation control") specification_doc = add_defaults_to_json_object_for_schema(specification_doc, subtask_template['schema']) cluster_url = client.get_path_as_json_object('/cluster/1')['url'] @@ -350,9 +350,9 @@ class SchedulingTest(unittest.TestCase): def test_schedule_observation_subtask_with_blocking_reservation_ok(self): """ - Set (Resource Assigner) station CS001 to reserved + Set (Resource Assigner) station CS001, CS003 to reserved Schedule subtask with station CS001, CS002, CS003 - Check if schedule of the subtasks do not fail (it can schedule with station CS002 and CS003) + Check if schedule of the subtasks do not fail (it can schedule with station CS002) """ self.assertTrue(create_reserved_stations_for_testing(['CS001','CS003']))