Skip to content
Snippets Groups Projects
Commit ea50d329 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-1414: check and test for pinning

parent 8b08d653
No related branches found
No related tags found
1 merge request!721TMSS-1414
...@@ -56,6 +56,7 @@ class TestCleanupTMSSIntegration(unittest.TestCase): ...@@ -56,6 +56,7 @@ class TestCleanupTMSSIntegration(unittest.TestCase):
scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data()) scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data())
scheduling_set.project.auto_ingest = False # for user granting permission (in this test the simulator does that for us) scheduling_set.project.auto_ingest = False # for user granting permission (in this test the simulator does that for us)
scheduling_set.project.auto_pin = True # all tasks should pin their output data by default
scheduling_set.project.save() scheduling_set.project.save()
strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="Short Test Observation - Pipeline - Ingest") strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="Short Test Observation - Pipeline - Ingest")
...@@ -66,11 +67,19 @@ class TestCleanupTMSSIntegration(unittest.TestCase): ...@@ -66,11 +67,19 @@ class TestCleanupTMSSIntegration(unittest.TestCase):
scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data(template=strategy_template.scheduling_unit_template, scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data(template=strategy_template.scheduling_unit_template,
scheduling_set=scheduling_set)) scheduling_set=scheduling_set))
scheduling_unit_draft.ingest_permission_required = False
# we require explicit ingest permission
# this is set in the TestEventHanler when the pipeline has finished
scheduling_unit_draft.ingest_permission_required = True
scheduling_unit_draft.save() scheduling_unit_draft.save()
scheduling_unit_draft = update_task_graph_from_specifications_doc(scheduling_unit_draft, specifications_doc=scheduling_unit_spec) scheduling_unit_draft = update_task_graph_from_specifications_doc(scheduling_unit_draft, specifications_doc=scheduling_unit_spec)
cleanup_task_draft = scheduling_unit_draft.task_drafts.get(specifications_template__type__value=models.TaskType.Choices.CLEANUP.value)
self.assertTrue(cleanup_task_draft.output_pinned)
scheduling_unit = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) scheduling_unit = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)
cleanup_task_blueprint = scheduling_unit.task_blueprints.get(specifications_template__type__value=models.TaskType.Choices.CLEANUP.value)
self.assertTrue(cleanup_task_blueprint.output_pinned)
# ensure/check the data dir is empty at the start # ensure/check the data dir is empty at the start
self.assertEqual([], os.listdir(self.TEST_DIR)) self.assertEqual([], os.listdir(self.TEST_DIR))
...@@ -133,6 +142,23 @@ class TestCleanupTMSSIntegration(unittest.TestCase): ...@@ -133,6 +142,23 @@ class TestCleanupTMSSIntegration(unittest.TestCase):
self._sync_object['observation_did_write_files'] = subtask_did_write_files self._sync_object['observation_did_write_files'] = subtask_did_write_files
elif subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value: elif subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value:
self._sync_object['pipeline_did_write_files'] = subtask_did_write_files self._sync_object['pipeline_did_write_files'] = subtask_did_write_files
scheduling_unit_blueprint = subtask.task_blueprint.scheduling_unit_blueprint
assert(scheduling_unit_blueprint.ingest_permission_required)
assert(scheduling_unit_blueprint.ingest_permission_granted_since is None)
# grant ingest permission. This triggers via event the ingest subtask to be scheduled->started->finished
logger.info("granting ingest permission for scheduling_unit_blueprint %s", scheduling_unit_blueprint.id)
scheduling_unit_blueprint.ingest_permission_granted_since = datetime.utcnow()
scheduling_unit_blueprint.save()
elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value:
for task in subtask.task_blueprint.scheduling_unit_blueprint.task_blueprints.all():
# check if task output is indeed pinned (which follows from project auto_pin)
assert(task.output_pinned)
logger.info("unpinning output data task id=%s", task.id)
task.output_pinned = False
task.save()
elif subtask.specifications_template.type.value == models.SubtaskType.Choices.CLEANUP.value: elif subtask.specifications_template.type.value == models.SubtaskType.Choices.CLEANUP.value:
self._sync_object['cleanup_deleted_written_files'] = not any(os.path.exists(dp.filepath) and os.path.getsize(dp.filepath) > 0 self._sync_object['cleanup_deleted_written_files'] = not any(os.path.exists(dp.filepath) and os.path.getsize(dp.filepath) > 0
for dp in subtask.input_dataproducts.all()) for dp in subtask.input_dataproducts.all())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment