diff --git a/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py index d4e94c8b3c326be81eb5dac97efc48d50858f870..d6909516fe3c6bf2417c382ec7f1322923b8744c 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py @@ -73,10 +73,15 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler): logger.info("skipping scheduling of successor subtask %s for finished subtask %s because not all its other predecessor subtasks are finished", suc_subtask_id, id) else: logger.info("trying to schedule successor subtask %s for finished subtask %s", suc_subtask_id, id) - scheduled_successor = self.tmss_client.try_schedule_subtask(suc_subtask_id) - # TODO TMSS-483: Check if ingest subtask is allowed to be scheduled, or if we are waiting for the IngestPermissionGranted event. + # try scheduling the subtask. + # if it succeeds, then the state will be 'scheduled' afterwards + # if there is a specification error, then the state will be 'error' afterwards + # if there is another kind of error (like needing ingest-permission), then the state will be 'defined' afterwards, so you can retry. + # for the ingest-permission we will retry automatically when that permission is granted + scheduled_successor = self.tmss_client.schedule_subtask(suc_subtask_id) suc_subtask_state = scheduled_successor['state_value'] - logger.info("successor subtask %s for finished subtask %s now has state '%s', see %s", suc_subtask_id, id, suc_subtask_state, scheduled_successor['url']) + logger.log(logging.INFO if suc_subtask_state=='scheduled' else logging.WARNING, + "successor subtask %s for finished subtask %s now has state '%s', see %s", suc_subtask_id, id, suc_subtask_state, scheduled_successor['url']) else: logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because its state is '%s'", suc_subtask_id, id, suc_subtask_state) @@ -92,7 +97,7 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler): subtask_template = self.tmss_client.get_url_as_json_object(subtask['specifications_template']) if subtask_template['type_value'] == 'ingest': logger.info("trying to schedule ingest subtask id=%s for scheduling_unit_blueprint id=%s...", subtask['id'], id) - self.tmss_client.try_schedule_subtask(subtask['id']) + self.tmss_client.schedule_subtask(subtask['id']) def create_subtask_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str=None): diff --git a/SAS/TMSS/backend/src/tmss/exceptions.py b/SAS/TMSS/backend/src/tmss/exceptions.py index e95a88aac1dd5ee69477320a8c83e13adf860bd8..82784f607a942acfc3874ee77252b849839f4170 100644 --- a/SAS/TMSS/backend/src/tmss/exceptions.py +++ b/SAS/TMSS/backend/src/tmss/exceptions.py @@ -26,6 +26,9 @@ class SchedulingException(TMSSException): class SubtaskSchedulingException(SubtaskException, SchedulingException): pass +class SubtaskSchedulingSpecificationException(SubtaskSchedulingException): + pass + class TaskSchedulingException(SchedulingException): pass diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 6f6303bc18d2c06f730a9a1384db65f445ab3d0f..73bbeaee8b8312b9b2b05bd6bb0bad652ca155d1 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -527,9 +527,14 @@ def schedule_subtask(subtask: Subtask) -> Subtask: try: logger.exception(e) - # set the subtask to state 'ERROR'... - subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value) - subtask.save() + if isinstance(e, SubtaskSchedulingSpecificationException): + # set the subtask to state 'ERROR' in case of a specification exception + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value) + subtask.save() + elif subtask.state == SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value): + # set the subtask back to state 'DEFINED' to allow the user/system to retry later + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + subtask.save() except Exception as e2: logger.error(e2) finally: @@ -783,8 +788,8 @@ def update_specification(ra_spec, lst_stations): :return: Dictionary with updated RA specification """ if len(lst_stations) == 0: - raise SubtaskSchedulingException("Cannot re-assign resources after conflict for subtask id=%d " - "because there are no stations left to assign. " % ra_spec["tmss_id"]) + raise SubtaskSchedulingSpecificationException("Cannot re-assign resources after conflict for subtask id=%d " + "because there are no stations left to assign. " % ra_spec["tmss_id"]) updated_ra_spec = ra_spec updated_ra_spec["specification"]["Observation.VirtualInstrument.stationList"] = "[%s]" % ','.join(s for s in lst_stations) # ?? should the station_requirements also be updated or just leave that empty '[]' assume for now it can be empty @@ -853,12 +858,12 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask): check_prerequities_for_scheduling(qaplots_subtask) if qaplots_subtask.specifications_template.type.value != SubtaskType.Choices.QA_PLOTS.value: - raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk, + raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk, qaplots_subtask.specifications_template.type, SubtaskType.Choices.QA_PLOTS.value)) if len(qaplots_subtask.inputs.all()) != 1: - raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qaplots_subtask.id, len(qaplots_subtask.inputs))) + raise SubtaskSchedulingSpecificationException("QA subtask id=%s should have 1 input, but it has %s" % (qaplots_subtask.id, len(qaplots_subtask.inputs))) # step 1: set state to SCHEDULING qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) @@ -931,9 +936,9 @@ def schedule_observation_subtask(observation_subtask: Subtask): check_prerequities_for_scheduling(observation_subtask) if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value: - raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk, - observation_subtask.specifications_template.type, - SubtaskType.Choices.OBSERVATION.value)) + raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk, + observation_subtask.specifications_template.type, + SubtaskType.Choices.OBSERVATION.value)) # step 1: set state to SCHEDULING observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) @@ -942,13 +947,13 @@ def schedule_observation_subtask(observation_subtask: Subtask): # step 1a: check start/stop times # start time should be known. If not raise. Then the user and/or scheduling service should supply a properly calculated/estimated start_time first. if observation_subtask.start_time is None: - raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no start_time" % (observation_subtask.pk, - observation_subtask.specifications_template.type)) + raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because it has no start_time" % (observation_subtask.pk, + observation_subtask.specifications_template.type)) if observation_subtask.specified_duration < timedelta(seconds=1): - raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (observation_subtask.pk, - observation_subtask.specifications_template.type, - observation_subtask.specified_duration)) + raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (observation_subtask.pk, + observation_subtask.specifications_template.type, + observation_subtask.specified_duration)) # always update the stop_time according to the spec observation_subtask.stop_time = observation_subtask.start_time + observation_subtask.specified_duration @@ -1022,9 +1027,9 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): check_prerequities_for_scheduling(pipeline_subtask) if pipeline_subtask.specifications_template.type.value != SubtaskType.Choices.PIPELINE.value: - raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (pipeline_subtask.pk, - pipeline_subtask.specifications_template.type, - SubtaskType.Choices.PIPELINE.value)) + raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (pipeline_subtask.pk, + pipeline_subtask.specifications_template.type, + SubtaskType.Choices.PIPELINE.value)) # step 1: set state to SCHEDULING pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) @@ -1038,17 +1043,17 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): pipeline_subtask.start_time = now if pipeline_subtask.specified_duration < timedelta(seconds=1): - raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (pipeline_subtask.pk, - pipeline_subtask.specifications_template.type, - pipeline_subtask.specified_duration)) + raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (pipeline_subtask.pk, + pipeline_subtask.specifications_template.type, + pipeline_subtask.specified_duration)) # always update the stop_time according to the spec pipeline_subtask.stop_time = pipeline_subtask.start_time + pipeline_subtask.specified_duration # step 2: link input dataproducts if pipeline_subtask.inputs.count() == 0: - raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (pipeline_subtask.pk, - pipeline_subtask.specifications_template.type)) + raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (pipeline_subtask.pk, + pipeline_subtask.specifications_template.type)) # TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty" dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="empty") @@ -1116,9 +1121,13 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): check_prerequities_for_scheduling(ingest_subtask) if ingest_subtask.specifications_template.type.value != SubtaskType.Choices.INGEST.value: - raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (ingest_subtask.pk, - ingest_subtask.specifications_template.type, - SubtaskType.Choices.INGEST.value)) + raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (ingest_subtask.pk, + ingest_subtask.specifications_template.type, + SubtaskType.Choices.INGEST.value)) + + # step 1: set state to SCHEDULING + ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) + ingest_subtask.save() # check permission pre-requisites scheduling_unit_blueprint = ingest_subtask.task_blueprint.scheduling_unit_blueprint @@ -1126,10 +1135,6 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow(): raise SubtaskSchedulingException("Cannot schedule ingest subtask id=%d because it requires explicit permission and the permission has not been granted (yet)" % (ingest_subtask.pk,)) - # step 1: set state to SCHEDULING - ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) - ingest_subtask.save() - # step 1a: set start/stop times # not very relevant for ingest subtasks, but it's nice for the user to see when the ingest task was scheduled. # please note that an ingest subtask may idle for some time while it is in the ingest queue. diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 73892be8d0b9f8c95f1fa9aa0c0e28a2f39b85ef..ac067a9097226d0cc39951e6e8e277ba78603ab9 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -102,6 +102,15 @@ class SchedulingTest(unittest.TestCase): for spec in tmss_test_env.ra_test_environment.radb.getSpecifications(): tmss_test_env.ra_test_environment.radb.deleteSpecification(spec['id']) + DataproductTransform.objects.all().delete() + Dataproduct.objects.all().delete() + SubtaskInput.objects.all().delete() + SubtaskOutput.objects.all().delete() + Subtask.objects.all().delete() + + test_data_creator.wipe_cache() + + def test_schedule_observation_subtask_with_enough_resources_available(self): with tmss_test_env.create_tmss_client() as client: task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="target observation")['url']) @@ -313,10 +322,24 @@ class SchedulingTest(unittest.TestCase): client.set_subtask_status(predecessor['id'], 'finished') client.set_subtask_status(ingest_subtask['id'], 'defined') - # trigger + task_blueprint = client.get_url_as_json_object(ingest_subtask['task_blueprint']) + schedulingunit_blueprint = client.get_url_as_json_object(task_blueprint['scheduling_unit_blueprint']) + + # first, make sure we need but do not have ingest persmission... + client.session.patch(schedulingunit_blueprint['url'], json={'ingest_permission_required': True, 'ingest_permission_granted_since': None}) + + with self.assertRaises(Exception) as context: + subtask = client.schedule_subtask(ingest_subtask['id']) + self.assertTrue('permission' in str(context.exception)) + + subtask = client.get_subtask(ingest_subtask['id']) + self.assertEqual('defined', subtask['state_value']) + + # now grant permission... + client.session.patch(schedulingunit_blueprint['url'], json={'ingest_permission_required': True, 'ingest_permission_granted_since': datetime.utcnow().isoformat()}) + subtask = client.schedule_subtask(ingest_subtask['id']) - # assert self.assertEqual('scheduled', subtask['state_value']) self.assertEqual(models.Subtask.objects.get(id=ingest_subtask['id']).inputs.first().dataproducts.count(), 1) diff --git a/SAS/TMSS/backend/test/tmss_test_data_rest.py b/SAS/TMSS/backend/test/tmss_test_data_rest.py index bc90c5ac03346f5226a834a7922545223a37b81d..c595e285831f2ca083bc2d20f3056d033bea721a 100644 --- a/SAS/TMSS/backend/test/tmss_test_data_rest.py +++ b/SAS/TMSS/backend/test/tmss_test_data_rest.py @@ -856,4 +856,9 @@ class TMSSRESTTestDataCreator(): 'PUT': PUT or [], 'PATCH': PATCH or [], 'DELETE': DELETE or [], - 'POST': POST or []} \ No newline at end of file + 'POST': POST or []} + + def wipe_cache(self): + for attr in ['_dataproduct_url', '_subtask_url', '_subtask_output_url', '_subtask_template_url', '_cluster_url', '_cycle_url', '_project_url', '_resource_type_url', '_scheduling_set_url', '_scheduling_unit_blueprint_url', '_task_blueprint_url', '_task_draft_url']: + if hasattr(self, attr): + delattr(self, attr) diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index baf653c10f8812fa75c5113de0a9c0ceaba11632..25f2b5258879fbe15212c44d49d3d0b4fe4f761c 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -327,18 +327,6 @@ class TMSSsession(object): returns the scheduled subtask upon success, or raises.""" return self.get_path_as_json_object('subtask/%s/schedule' % subtask_id) - def try_schedule_subtask(self, subtask_id: int) -> {}: - """try to schedule the subtask for the given subtask_id. - returns the scheduled subtask upon success. - resets the subtask's state if it ends up in error state.""" - try: - return self.get_path_as_json_object('subtask/%s/schedule' % subtask_id) - except Exception as e: - logger.error(str(e)) - subtask = self.get_subtask(subtask_id) - if subtask['state_value'] == 'error': - self.set_subtask_status(subtask_id, 'defined') - def get_subtask_progress(self, subtask_id: int) -> {}: """get the progress [0.0, 1.0] of a running subtask. returns a dict with the 'id' and 'progress', or raises."""