diff --git a/SAS/DataManagement/Cleanup/CleanupService/test/t_cleanup_tmss_integration_test.py b/SAS/DataManagement/Cleanup/CleanupService/test/t_cleanup_tmss_integration_test.py index 59f112b68dd875336a830bef5de10252dad671c7..143b64776893a476a1efb885c238030675d061aa 100755 --- a/SAS/DataManagement/Cleanup/CleanupService/test/t_cleanup_tmss_integration_test.py +++ b/SAS/DataManagement/Cleanup/CleanupService/test/t_cleanup_tmss_integration_test.py @@ -56,6 +56,7 @@ class TestCleanupTMSSIntegration(unittest.TestCase): scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data()) scheduling_set.project.auto_ingest = False # for user granting permission (in this test the simulator does that for us) + scheduling_set.project.auto_pin = True # all tasks should pin their output data by default scheduling_set.project.save() strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="Short Test Observation - Pipeline - Ingest") @@ -66,11 +67,19 @@ class TestCleanupTMSSIntegration(unittest.TestCase): scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data(template=strategy_template.scheduling_unit_template, scheduling_set=scheduling_set)) - scheduling_unit_draft.ingest_permission_required = False + + # we require explicit ingest permission + # this is set in the TestEventHanler when the pipeline has finished + scheduling_unit_draft.ingest_permission_required = True scheduling_unit_draft.save() scheduling_unit_draft = update_task_graph_from_specifications_doc(scheduling_unit_draft, specifications_doc=scheduling_unit_spec) + cleanup_task_draft = scheduling_unit_draft.task_drafts.get(specifications_template__type__value=models.TaskType.Choices.CLEANUP.value) + self.assertTrue(cleanup_task_draft.output_pinned) + scheduling_unit = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + cleanup_task_blueprint = scheduling_unit.task_blueprints.get(specifications_template__type__value=models.TaskType.Choices.CLEANUP.value) + self.assertTrue(cleanup_task_blueprint.output_pinned) # ensure/check the data dir is empty at the start self.assertEqual([], os.listdir(self.TEST_DIR)) @@ -133,6 +142,23 @@ class TestCleanupTMSSIntegration(unittest.TestCase): self._sync_object['observation_did_write_files'] = subtask_did_write_files elif subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value: self._sync_object['pipeline_did_write_files'] = subtask_did_write_files + + scheduling_unit_blueprint = subtask.task_blueprint.scheduling_unit_blueprint + assert(scheduling_unit_blueprint.ingest_permission_required) + assert(scheduling_unit_blueprint.ingest_permission_granted_since is None) + + # grant ingest permission. This triggers via event the ingest subtask to be scheduled->started->finished + logger.info("granting ingest permission for scheduling_unit_blueprint %s", scheduling_unit_blueprint.id) + scheduling_unit_blueprint.ingest_permission_granted_since = datetime.utcnow() + scheduling_unit_blueprint.save() + elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value: + for task in subtask.task_blueprint.scheduling_unit_blueprint.task_blueprints.all(): + # check if task output is indeed pinned (which follows from project auto_pin) + assert(task.output_pinned) + + logger.info("unpinning output data task id=%s", task.id) + task.output_pinned = False + task.save() elif subtask.specifications_template.type.value == models.SubtaskType.Choices.CLEANUP.value: self._sync_object['cleanup_deleted_written_files'] = not any(os.path.exists(dp.filepath) and os.path.getsize(dp.filepath) > 0 for dp in subtask.input_dataproducts.all()) diff --git a/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py index c5168cd778a288ed5d4f99f1e7306c8fd3c20cbc..f3ed9c72c76efb0717574eb9b8bf209c07176dd2 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py @@ -36,12 +36,79 @@ logger = logging.getLogger(__name__) from lofar.sas.tmss.client.tmssbuslistener import * from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SubtaskState, SubtaskType, SchedulingUnitBlueprint from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask +from lofar.sas.tmss.tmss.exceptions import TMSSException from lofar.common.datetimeutils import round_to_second_precision from datetime import datetime, timedelta + class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler): ''' ''' + def try_schedule_subtasks(self, subtasks): + '''try to schedule each subtask in the list''' + for subtask in subtasks: + try: + # start with checking the prerequisites before we try schedule the subtask. + # We could of course just try to schedule it and check for errors, + # but that pollutes the logs with ugly error messages indicating something bad happened. + # It's much nice to present informative log messages here with reasonble explanations why a subtask cannot be scheduled. + + if subtask.is_obsolete: + logger.info("skipping scheduling of obsolete subtask %s", subtask.id) + continue + + if subtask.state.value != SubtaskState.Choices.DEFINED.value: + logger.info("skipping scheduling of subtask %s with state=%s", subtask.id, subtask.state) + continue + + predecessors = subtask.predecessors.all() + if any([predecessor.state.value != SubtaskState.Choices.FINISHED.value and not predecessor.is_obsolete for predecessor in predecessors]): + logger.info("skipping scheduling of subtask %s because not all its (non-obsolete) predecessors are finished", subtask.id) + continue + + if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value: + # check ingest permission + scheduling_unit_blueprint = subtask.task_blueprint.scheduling_unit_blueprint + if scheduling_unit_blueprint.ingest_permission_required: + if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow(): + logger.info("skipping scheduling of ingest subtask %s which requires permission from scheduling_unit_blueprint id=%s first", subtask.id, scheduling_unit_blueprint.id) + continue + + if subtask.specifications_template.type.value == SubtaskType.Choices.CLEANUP.value: + # can we schedule this cleanup subtask already, or do we need to wait for ingest to finish and/or pinning + # check pinning + predecessor_tasks_with_pinned_data = [subtask_pred.task_blueprint for subtask_pred in subtask.predecessors if subtask_pred.task_blueprint.output_pinned] + if predecessor_tasks_with_pinned_data: + logger.info("skipping scheduling of cleanup subtask id=%s for which the data of to-be-cleaned-up task_bluerints %s has been pinned", subtask.id, ','.join(str(t.id) for t in predecessor_tasks_with_pinned_data)) + continue + + predecessor_tasks_with_pinned_data = [subtask_pred.task_blueprint for subtask_pred in subtask.predecessors if subtask_pred.task_blueprint.output_pinned] + if predecessor_tasks_with_pinned_data: + logger.info("skipping scheduling of cleanup subtask id=%s for which the data of to-be-cleaned-up task_bluerints %s has been pinned", subtask.id, ','.join(str(t.id) for t in predecessor_tasks_with_pinned_data)) + continue + + # check ingest status(es) + scheduling_unit_ingest_subtasks = Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint__id=subtask.task_blueprint.scheduling_unit_blueprint.id)\ + .filter(specifications_template__type__value=SubtaskType.Choices.INGEST.value) \ + .exclude(state__value=SubtaskState.Choices.FINISHED.value) \ + .filter(obsolete_since__isnull=True).order_by('id').all() + + if scheduling_unit_ingest_subtasks.exists(): + logger.info("skipping scheduling of cleanup subtask id=%s because not all ingest subtasks in the same scheduling unit are finished yet", subtask.id) + continue + + # ok, all prerequisites have been checked. + # try scheduling the subtask. + # if it succeeds, then the state will be 'scheduled' afterwards + # if there is a specification error, then the state will be 'error' afterwards + # if there is another kind of error (like needing ingest-permission), then the state will be 'defined' afterwards, so you can retry. + # for the ingest-permission we will retry automatically when that permission is granted + schedule_subtask(subtask, start_time=round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))) + except TMSSException as e: + # just log the error and continue with the next subtask. + # the user should take action on "fixing" the unschedulable subtask + logger.error(e) + def onSubTaskCreated(self, id: int): super().onSubTaskCreated(id) subtask = Subtask.objects.get(id=id) @@ -49,52 +116,49 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler): if predecessors.exists() and all([p.state.value==SubtaskState.Choices.FINISHED.value or p.is_obsolete for p in predecessors]): logger.info("subtask %s was just created and all its predecessors are finished or obsolete. trying to schedule it...", id) - subtask = schedule_subtask(subtask, start_time=round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))) - logger.log(logging.INFO if subtask.state.value==SubtaskState.Choices.SCHEDULED.value else logging.WARNING, - "new subtask %s now has state '%s'", id, subtask.state.value) + self.try_schedule_subtasks([subtask]) def onSubTaskStatusChanged(self, id: int, status: str): super().onSubTaskStatusChanged(id, status) if status == SubtaskState.Choices.FINISHED.value: subtask = Subtask.objects.get(id=id) - successors = subtask.successors.filter(state__value=SubtaskState.Choices.DEFINED.value).filter(obsolete_since__isnull=True).order_by('id').all() - successor_ids = [s.id for s in successors] - - logger.info("subtask %s finished. trying to %s schedule defined non-obsolete successors: %s", - id, successors.count(), ' '.join(str(id) for id in successor_ids) or 'None') - - for successor in successors: - try: - successor_predecessors = successor.predecessors.all() - - if any([suc_pred.state.value != SubtaskState.Choices.FINISHED.value and not suc_pred.is_obsolete for suc_pred in successor_predecessors]): - logger.info("skipping scheduling of successor subtask %s for finished subtask %s because not all its other (non-obsolete) predecessor subtasks are finished", successor.id, id) - else: - logger.info("trying to schedule successor subtask %s for finished subtask %s", successor.id, id) - # try scheduling the subtask. - # if it succeeds, then the state will be 'scheduled' afterwards - # if there is a specification error, then the state will be 'error' afterwards - # if there is another kind of error (like needing ingest-permission), then the state will be 'defined' afterwards, so you can retry. - # for the ingest-permission we will retry automatically when that permission is granted - scheduled_successor = schedule_subtask(successor, start_time=round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))) - logger.log(logging.INFO if scheduled_successor.state.value==SubtaskState.Choices.SCHEDULED.value else logging.WARNING, - "successor subtask %s for finished subtask %s now has state '%s'", successor.id, id, scheduled_successor.state.value) - - except Exception as e: - logger.error(e) + successors = subtask.successors.filter(state__value=SubtaskState.Choices.DEFINED.value)\ + .filter(obsolete_since__isnull=True).order_by('id').all() + + if successors: + logger.info("subtask %s finished. trying to schedule %s defined non-obsolete successors: %s", id, successors.count(), ' '.join(str(id) for id in [s.id for s in successors]) or 'None') + self.try_schedule_subtasks(successors) + + if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value: + # ingest subtasks have a special type of "successors": the cleanup subtask(s) + # these cleanup subtasks do not depend on the ingest output data, so they are no direct data successor + # these cleanup subtasks do depend on the ingest with the same scheduling unit to be finished. + scheduling_unit_cleanup_subtasks = Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint__id=subtask.task_blueprint.scheduling_unit_blueprint.id)\ + .filter(specifications_template__type__value=SubtaskType.Choices.CLEANUP.value) \ + .filter(state__value=SubtaskState.Choices.DEFINED.value) \ + .filter(obsolete_since__isnull=True).order_by('id').all() + if scheduling_unit_cleanup_subtasks.exists(): + logger.info("ingest subtask %s finished. trying to schedule %s defined non-obsolete cleanup successor(s): %s", id, scheduling_unit_cleanup_subtasks.count(), ' '.join(str(id) for id in [s.id for s in scheduling_unit_cleanup_subtasks]) or 'None') + self.try_schedule_subtasks(scheduling_unit_cleanup_subtasks) - def onSchedulingUnitBlueprintIngestPermissionGranted(self, id: int, ingest_permission_granted_since: datetime): - logger.info("ingest_permission_granted_since='%s' for scheduling_unit_blueprint id=%s", ingest_permission_granted_since, id) - scheduling_unit = SchedulingUnitBlueprint.objects.get(id=id) - for task in scheduling_unit.task_blueprints.all(): - for subtask in task.subtasks.all(): - if subtask.state.value == SubtaskState.Choices.DEFINED.value: - if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value: - if all(pred.state.value == SubtaskState.Choices.FINISHED.value or pred.is_obsolete for pred in subtask.predecessors.all()): - logger.info("trying to schedule ingest subtask id=%s for scheduling_unit_blueprint id=%s...", subtask.id, id) - schedule_subtask(subtask, start_time=round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))) + def onSchedulingUnitBlueprintIngestPermissionGranted(self, id: int, ingest_permission_granted_since: datetime): + logger.info("ingest_permission_granted_since='%s' for scheduling_unit_blueprint id=%s, trying to schedule ingest subtasks if any, and prerequisites are met", ingest_permission_granted_since, id) + scheduling_unit_ingest_subtasks = Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint__id=id)\ + .filter(specifications_template__type__value=SubtaskType.Choices.INGEST.value) \ + .filter(state__value=SubtaskState.Choices.DEFINED.value) \ + .filter(obsolete_since__isnull=True).order_by('id').all() + self.try_schedule_subtasks(scheduling_unit_ingest_subtasks) + + def onTaskBlueprintOutputPinningUpdated(self, id: int, output_pinned: bool): + logger.info("task_blueprint id=%s output_pinned changed to '%s'", id, output_pinned) + scheduling_unit_cleanup_subtasks = Subtask.objects.filter(task_blueprint__id=id)\ + .filter(specifications_template__type__value=SubtaskType.Choices.CLEANUP.value) \ + .filter(task_blueprint__output_pinned=False)\ + .filter(state__value=SubtaskState.Choices.DEFINED.value) \ + .filter(obsolete_since__isnull=True).order_by('id').all() + self.try_schedule_subtasks(scheduling_unit_cleanup_subtasks) def create_subtask_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler, diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0024_patches.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0024_patches.py new file mode 100644 index 0000000000000000000000000000000000000000..6df5fb70b62b70ce41e9f3e3fca437a428a8febe --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0024_patches.py @@ -0,0 +1,168 @@ +from django.db import migrations, models +import django.db.models.deletion +from lofar.sas.tmss.tmss.tmssapp.populate import populate_task_schedulingunit_choices + + +class Migration(migrations.Migration): + + dependencies = [ + ('tmssapp', '0023_subtaskoutput_filesystem'), + ] + + operations = [ + # the following migration step alters the tmssapp_taskblueprint_compute_aggretates_for_schedulingunitblueprint trigger function from migration 0015_db_aggregates + # the only difference is that there is a patch in assigning the scheduling unit 'ingested' status. + # yes, this is ugly/annoying that we have to replicate code, but that's how database migration steps/deltas/patches work... + migrations.RunSQL('''CREATE OR REPLACE FUNCTION tmssapp_taskblueprint_compute_aggretates_for_schedulingunitblueprint() + RETURNS trigger AS + $BODY$ + DECLARE + agg_scheduled_start_time timestamp with time zone; + agg_scheduled_stop_time timestamp with time zone; + agg_actual_process_start_time timestamp with time zone; + agg_actual_process_stop_time timestamp with time zone; + agg_actual_on_sky_start_time timestamp with time zone; + agg_actual_on_sky_stop_time timestamp with time zone; + agg_process_start_time timestamp with time zone; + agg_process_stop_time timestamp with time zone; + agg_on_sky_start_time timestamp with time zone; + agg_on_sky_stop_time timestamp with time zone; + agg_observed_start_time timestamp with time zone; + agg_observed_stop_time timestamp with time zone; + agg_status character varying(128); + agg_obsolete_since timestamp with time zone; + agg_on_sky_duration interval; + agg_observed_duration interval; + agg_duration interval; + BEGIN + + -- aggregate the various timestamps + -- consider all the tasks for this schedulingunitblueprint + -- use a WITH Common Table Expression followed by a SELECT INTO variables for speed + WITH task_agg AS (SELECT MIN(scheduled_start_time) as scheduled_start_time, + MAX(scheduled_stop_time) as scheduled_stop_time, + MIN(actual_process_start_time) as actual_process_start_time, + MAX(actual_process_stop_time) as actual_process_stop_time, + SUM(duration) as duration, + MIN(actual_on_sky_start_time) as actual_on_sky_start_time, + MAX(actual_on_sky_stop_time) as actual_on_sky_stop_time, + MIN(process_start_time) as process_start_time, + MAX(process_stop_time) as process_stop_time, + MIN(on_sky_start_time) as on_sky_start_time, + MAX(on_sky_stop_time) as on_sky_stop_time, + SUM(on_sky_duration) as on_sky_duration, + MIN(obsolete_since) as obsolete_since + FROM tmssapp_taskblueprint + WHERE tmssapp_taskblueprint.scheduling_unit_blueprint_id=NEW.scheduling_unit_blueprint_id) + SELECT scheduled_start_time, scheduled_stop_time, actual_process_start_time, actual_process_stop_time, actual_on_sky_start_time, actual_on_sky_stop_time, on_sky_start_time, on_sky_stop_time, process_start_time, process_stop_time, on_sky_start_time, on_sky_stop_time, obsolete_since, duration, on_sky_duration + INTO agg_scheduled_start_time, agg_scheduled_stop_time, agg_actual_process_start_time, agg_actual_process_stop_time, agg_actual_on_sky_start_time, agg_actual_on_sky_stop_time, agg_on_sky_start_time, agg_on_sky_stop_time, agg_process_start_time, agg_process_stop_time, agg_on_sky_start_time, agg_on_sky_stop_time, agg_obsolete_since, agg_duration, agg_on_sky_duration + FROM task_agg; + + -- compute observed_start/stop_time observerd_duration. These are derived from finished/observed tasks only. + -- use a WITH Common Table Expression followed by a SELECT INTO variables for speed + WITH observed_task_agg AS (SELECT MIN(on_sky_start_time) as observed_start_time, + MAX(on_sky_stop_time) as observed_stop_time, + SUM(on_sky_duration) as observed_duration + FROM tmssapp_taskblueprint + WHERE tmssapp_taskblueprint.scheduling_unit_blueprint_id=NEW.scheduling_unit_blueprint_id + AND tmssapp_taskblueprint.status_id in ('observed', 'finished')) + SELECT observed_start_time, observed_stop_time, observed_duration + INTO agg_observed_start_time, agg_observed_stop_time, agg_observed_duration + FROM observed_task_agg; + + -- aggregate the task statuses into one schedulingunit status + -- See design: https://support.astron.nl/confluence/display/TMSS/Specification+Flow/#SpecificationFlow-SchedulingUnitBlueprints + -- consider all the taskblueprints for this schedulingunitblueprint + -- use a WITH Common Table Expression followed by a SELECT INTO variables for speed + WITH task_statuses AS (SELECT tmssapp_taskblueprint.status_id, tmssapp_tasktemplate.type_id + FROM tmssapp_taskblueprint + INNER JOIN tmssapp_tasktemplate on tmssapp_tasktemplate.id=tmssapp_taskblueprint.specifications_template_id + WHERE tmssapp_taskblueprint.scheduling_unit_blueprint_id=NEW.scheduling_unit_blueprint_id) + SELECT CASE + WHEN (SELECT COUNT(true) FROM task_statuses)=0 THEN 'defined' + WHEN EXISTS(SELECT true FROM task_statuses WHERE status_id='defined') THEN 'defined' + WHEN (SELECT COUNT(true) FROM task_statuses WHERE status_id in ('finished', 'observed'))=(SELECT COUNT(true) FROM task_statuses) THEN 'finished' + WHEN EXISTS(SELECT true FROM task_statuses WHERE status_id='cancelled') THEN 'cancelled' + WHEN EXISTS(SELECT true FROM task_statuses WHERE status_id='error') THEN 'error' + WHEN EXISTS(SELECT true FROM task_statuses WHERE status_id='unschedulable') THEN 'unschedulable' + WHEN EXISTS(SELECT true FROM task_statuses WHERE status_id IN ('queued', 'started', 'observed', 'finished')) THEN + CASE + WHEN (SELECT COUNT(true) FROM task_statuses WHERE type_id='observation' AND status_id IN ('observed', 'finished'))<(SELECT COUNT(true) FROM task_statuses WHERE type_id='observation') THEN 'observing' + WHEN NOT EXISTS(SELECT true FROM task_statuses WHERE type_id='pipeline' AND status_id IN ('queued', 'started', 'finished')) THEN 'observed' + WHEN EXISTS(SELECT true FROM task_statuses WHERE type_id='ingest' AND status_id IN ('queued', 'started')) THEN 'ingesting' + WHEN (SELECT COUNT(true) FROM task_statuses WHERE type_id='ingest' AND status_id='finished')=(SELECT COUNT(true) FROM task_statuses WHERE type_id='ingest') THEN 'ingested' + WHEN (SELECT COUNT(true) FROM task_statuses WHERE type_id='pipeline' AND status_id='finished')=(SELECT COUNT(true) FROM task_statuses WHERE type_id='pipeline') THEN 'processed' + WHEN EXISTS(SELECT true FROM task_statuses WHERE type_id='pipeline' AND status_id IN ('queued', 'started', 'scheduled', 'finished')) THEN 'processing' + END + WHEN EXISTS(SELECT true FROM task_statuses WHERE status_id='scheduled') THEN 'scheduled' + END + INTO agg_status + FROM task_statuses; + + -- default when no tasks available + IF agg_status IS NULL THEN + agg_status := 'schedulable'; + END IF; + + -- all aggregated timestamps and statuses were computed + -- now update the referred taskblueprint with these aggregated values + UPDATE tmssapp_schedulingunitblueprint + SET scheduled_start_time=agg_scheduled_start_time, + scheduled_stop_time=agg_scheduled_stop_time, + actual_process_start_time=agg_actual_process_start_time, + actual_process_stop_time=agg_actual_process_stop_time, + actual_on_sky_start_time=agg_actual_on_sky_start_time, + actual_on_sky_stop_time=agg_actual_on_sky_stop_time, + process_start_time=agg_process_start_time, + process_stop_time=agg_process_stop_time, + on_sky_start_time=agg_on_sky_start_time, + on_sky_stop_time=agg_on_sky_stop_time, + observed_start_time=agg_observed_start_time, + observed_stop_time=agg_observed_stop_time, + status_id=agg_status, + obsolete_since=agg_obsolete_since, + on_sky_duration=agg_on_sky_duration, + observed_duration=agg_observed_duration, + duration=agg_duration + WHERE id=NEW.scheduling_unit_blueprint_id; + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql VOLATILE; + DROP TRIGGER IF EXISTS tmssapp_taskblueprint_compute_aggretates_for_schedulingunitblueprint ON tmssapp_taskblueprint ; + CREATE TRIGGER tmssapp_taskblueprint_compute_aggretates_for_schedulingunitblueprint + AFTER INSERT OR UPDATE ON tmssapp_taskblueprint + FOR EACH ROW EXECUTE PROCEDURE tmssapp_taskblueprint_compute_aggretates_for_schedulingunitblueprint(); + '''), + + # use database triggers to block updates on blueprint tables for immutable fields + # this one overrides the trigger from 0019_schedulingunitblueprint_immutable_fields.py + migrations.RunSQL('''CREATE OR REPLACE FUNCTION tmssapp_block_scheduling_unit_blueprint_immutable_fields_update() + RETURNS trigger AS + $BODY$ + BEGIN + IF OLD.specifications_template_id <> NEW.specifications_template_id OR + OLD.name <> NEW.name OR + OLD.description <> NEW.description OR + (OLD.ingest_permission_granted_since IS NOT NULL AND OLD.ingest_permission_granted_since <> NEW.ingest_permission_granted_since) OR + OLD.ingest_permission_required <> NEW.ingest_permission_required OR + OLD.piggyback_allowed_tbb <> NEW.piggyback_allowed_tbb OR + OLD.piggyback_allowed_aartfaac <> NEW.piggyback_allowed_aartfaac THEN + RAISE EXCEPTION 'ILLEGAL UPDATE OF IMMUTABLE BLUEPRINT FIELD'; + END IF; + IF (OLD.scheduling_constraints_template_id <> NEW.scheduling_constraints_template_id OR + OLD.scheduling_constraints_doc <> NEW.scheduling_constraints_doc) AND + OLD.status_id NOT IN ('defined', 'schedulable', 'unschedulable', 'scheduled') THEN + RAISE EXCEPTION 'ILLEGAL UPDATE OF IMMUTABLE scheduling_constraints FIELD for status=%', OLD.status_id; + END IF; + RETURN NEW; + END; + $BODY$ + LANGUAGE plpgsql VOLATILE; + DROP TRIGGER IF EXISTS tmssapp_block_scheduling_unit_blueprint_immutable_fields_update ON tmssapp_SchedulingUnitBlueprint ; + CREATE TRIGGER tmssapp_block_scheduling_unit_blueprint_immutable_fields_update + BEFORE UPDATE ON tmssapp_SchedulingUnitBlueprint + FOR EACH ROW EXECUTE PROCEDURE tmssapp_block_scheduling_unit_blueprint_immutable_fields_update(); + ''') + + ] diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index c28a983c5e7fd0d1d25bbce9da9ed18645805aae..60d1274eb18e85d37734b8c12ffaf738991b3e0f 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -1116,6 +1116,12 @@ class TaskBlueprint(ProjectPropertyMixin, TemplateSchemaMixin, RefreshFromDbInva constraints = [UniqueConstraint(fields=['name', 'scheduling_unit_blueprint'], name='TaskBlueprint_unique_name_in_scheduling_unit')] def save(self, force_insert=False, force_update=False, using=None, update_fields=None): + if self._state.adding: + # On creation, propagate the following task_draft attributes as default for this new task_blueprint + for copy_field in ['output_pinned']: + if hasattr(self, 'draft'): + setattr(self, copy_field, getattr(self.draft, copy_field)) + self.annotate_validate_add_defaults_to_doc_using_template('specifications_doc', 'specifications_template') super().save(force_insert, force_update, using, update_fields) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index f4d6b616cb2b1325297e27c3d1e138ce1987d01d..5545edbc725f0ba94e930c8984e81a5dd325a389 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -1961,7 +1961,7 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): SubtaskType.Choices.INGEST.value)) # check permission pre-requisites - scheduling_unit_blueprint = ingest_subtask.task_blueprint.scheduling_unit_blueprint # first() is fine because we assume an ingest subtask does not serve tasks across SU boundaries + scheduling_unit_blueprint = ingest_subtask.task_blueprint.scheduling_unit_blueprint if scheduling_unit_blueprint.ingest_permission_required: if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow(): raise SubtaskSchedulingPermissionException("Cannot schedule ingest subtask id=%d because it requires explicit permission and the permission has not been granted (yet)" % (ingest_subtask.pk,)) @@ -2047,6 +2047,11 @@ def schedule_cleanup_subtask(cleanup_subtask: Subtask): cleanup_subtask.specifications_template.type, SubtaskType.Choices.CLEANUP.value)) + predecessor_tasks_with_pinned_data = [subtask_pred.task_blueprint for subtask_pred in cleanup_subtask.predecessors if subtask_pred.task_blueprint.output_pinned] + + if predecessor_tasks_with_pinned_data: + raise SubtaskSchedulingSpecificationException("Cannot schedule cleanup subtask id=%d (yet) because the output of task(s) %s is pinned" % (cleanup_subtask.pk, ','.join(str(t.id) for t in predecessor_tasks_with_pinned_data))) + # step 1: set state to SCHEDULING cleanup_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) cleanup_subtask.save() diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index a375f0049e913e3e08ad174e317d65530e276475..1b69bfe1b0f18d7f9b21cdc41d6695ec29bd8302 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -820,10 +820,12 @@ class SubtaskInputOutputTest(unittest.TestCase): selection_template=trsel_template, selection_doc=trsel_template.get_default_json_document_for_schema()) - # convert to blueprint, ingest_permission_required = False + + su_draft.ingest_permission_required = False + su_draft.save() + + # convert to blueprint su_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(su_draft) - su_blueprint.ingest_permission_required = False - su_blueprint.save() # fetch the created subtasks obs_subtak = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint=su_blueprint, specifications_template__type__value='observation') diff --git a/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py b/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py index 7e37bf7bde152b8f4dd5301831a94b1c24beee15..3767b6e562d86afd9489a2c656983024eb51bbd9 100755 --- a/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py +++ b/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py @@ -976,6 +976,14 @@ class TaskBlueprintTest(unittest.TestCase): self.assertEqual(set(), set(task_blueprint_1.successors.all())) self.assertEqual(set(), set(task_blueprint_2.successors.all())) + + def test_TaskBlueprint_copies_fields_from_draft(self): + for output_pinned in (True, False): + draft = models.TaskDraft.objects.create(**TaskDraft_test_data(output_pinned=output_pinned)) + blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(task_draft=draft)) + self.assertEqual(output_pinned, draft.output_pinned) + self.assertEqual(output_pinned, blueprint.output_pinned) + def test_TaskBlueprint_predecessors_and_successors_simple(self): task_blueprint_1: models.TaskBlueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(name=str(uuid.uuid4()), task_draft=self.task_draft, scheduling_unit_blueprint=self.scheduling_unit_blueprint)) task_blueprint_2: models.TaskBlueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(name=str(uuid.uuid4()), task_draft=self.task_draft, scheduling_unit_blueprint=self.scheduling_unit_blueprint))