diff --git a/SAS/DataManagement/Cleanup/CleanupService/test/t_cleanup_tmss_integration_test.py b/SAS/DataManagement/Cleanup/CleanupService/test/t_cleanup_tmss_integration_test.py index 59f112b68dd875336a830bef5de10252dad671c7..143b64776893a476a1efb885c238030675d061aa 100755 --- a/SAS/DataManagement/Cleanup/CleanupService/test/t_cleanup_tmss_integration_test.py +++ b/SAS/DataManagement/Cleanup/CleanupService/test/t_cleanup_tmss_integration_test.py @@ -56,6 +56,7 @@ class TestCleanupTMSSIntegration(unittest.TestCase): scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data()) scheduling_set.project.auto_ingest = False # for user granting permission (in this test the simulator does that for us) + scheduling_set.project.auto_pin = True # all tasks should pin their output data by default scheduling_set.project.save() strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="Short Test Observation - Pipeline - Ingest") @@ -66,11 +67,19 @@ class TestCleanupTMSSIntegration(unittest.TestCase): scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data(template=strategy_template.scheduling_unit_template, scheduling_set=scheduling_set)) - scheduling_unit_draft.ingest_permission_required = False + + # we require explicit ingest permission + # this is set in the TestEventHanler when the pipeline has finished + scheduling_unit_draft.ingest_permission_required = True scheduling_unit_draft.save() scheduling_unit_draft = update_task_graph_from_specifications_doc(scheduling_unit_draft, specifications_doc=scheduling_unit_spec) + cleanup_task_draft = scheduling_unit_draft.task_drafts.get(specifications_template__type__value=models.TaskType.Choices.CLEANUP.value) + self.assertTrue(cleanup_task_draft.output_pinned) + scheduling_unit = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + cleanup_task_blueprint = scheduling_unit.task_blueprints.get(specifications_template__type__value=models.TaskType.Choices.CLEANUP.value) + self.assertTrue(cleanup_task_blueprint.output_pinned) # ensure/check the data dir is empty at the start self.assertEqual([], os.listdir(self.TEST_DIR)) @@ -133,6 +142,23 @@ class TestCleanupTMSSIntegration(unittest.TestCase): self._sync_object['observation_did_write_files'] = subtask_did_write_files elif subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value: self._sync_object['pipeline_did_write_files'] = subtask_did_write_files + + scheduling_unit_blueprint = subtask.task_blueprint.scheduling_unit_blueprint + assert(scheduling_unit_blueprint.ingest_permission_required) + assert(scheduling_unit_blueprint.ingest_permission_granted_since is None) + + # grant ingest permission. This triggers via event the ingest subtask to be scheduled->started->finished + logger.info("granting ingest permission for scheduling_unit_blueprint %s", scheduling_unit_blueprint.id) + scheduling_unit_blueprint.ingest_permission_granted_since = datetime.utcnow() + scheduling_unit_blueprint.save() + elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value: + for task in subtask.task_blueprint.scheduling_unit_blueprint.task_blueprints.all(): + # check if task output is indeed pinned (which follows from project auto_pin) + assert(task.output_pinned) + + logger.info("unpinning output data task id=%s", task.id) + task.output_pinned = False + task.save() elif subtask.specifications_template.type.value == models.SubtaskType.Choices.CLEANUP.value: self._sync_object['cleanup_deleted_written_files'] = not any(os.path.exists(dp.filepath) and os.path.getsize(dp.filepath) > 0 for dp in subtask.input_dataproducts.all())