diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index 63a50f229429c2b7731b37304e3e7071853fd5f0..290c5e8ba6793b882d29f399a6825e578b1670b0 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -115,6 +115,9 @@ class Activity(models.Model): # set by update_activity, used by Validation Page is_verified = models.BooleanField(default=False) + # flag set by ATDB to indicate that all tasks of this Activity has been processed + is_processed = models.BooleanField(default=False) + # TODO: flag set by the 'validate' step in ATDB, used by ancillary service is_validated = models.BooleanField(default=False) @@ -230,12 +233,19 @@ class Task(models.Model): tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id) self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds) - # nv:20feb2024, check if this task is a summary task + # when a task goes to PROCESSED... if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value): + # check if this task is a summary task self.is_summary = check_if_summary(self) + # if so, temporarily put it on hold so that the ancillary service can play with it + if self.is_summary: + self.resume = False + # nv:20feb2024, same as above, but for backward compatibilty reasons. # For tasks that are already beyond PROCESSED, but not yet ingested. + # But it has no use to put them on 'hold' for the ancillary service, + # because at this point the spider work directory has already been deleted by datamanager if (self.status != State.VALIDATED.value) & (self.new_status == State.VALIDATED.value): self.is_summary = check_if_summary(self) diff --git a/atdb/taskdatabase/services/activities_handler.py b/atdb/taskdatabase/services/activities_handler.py index fa44983408cd844a87f09f477e6a036b85c6821f..860589b73e604651d513cccb4173c1158b2c3945 100644 --- a/atdb/taskdatabase/services/activities_handler.py +++ b/atdb/taskdatabase/services/activities_handler.py @@ -1,5 +1,5 @@ import logging; -from .common import State, verified_statusses +from .common import State, verified_statusses, processed_statusses from taskdatabase.models import Task, Activity logger = logging.getLogger(__name__) @@ -129,6 +129,25 @@ def update_activity(task): activity.remaining = result['remaining'] activity.save() + + # check if all tasks of this SAS_ID have a status that is considered 'processed' + # this is used as a trigger for the ancillary service + if task.status in processed_statusses: + current_is_processed = activity.is_processed + activity.is_processed = True + for t in Task.objects.filter(sas_id=task.sas_id): + if t.status not in processed_statusses: + activity.is_processed = False + break + + # only save when changed + if activity.is_processed != current_is_processed: + # if the whole activity has become processed, then set the status of this activity to 'HANDLE_ANCILLARY' + if activity.is_processed: + activity.status = State.HANDLE_ANCILLARY.value + + activity.save() + # check if all tasks of this SAS_ID have a status that is considered 'verified' # this is used for the Validation Page current_is_verified = activity.is_verified diff --git a/atdb/taskdatabase/services/common.py b/atdb/taskdatabase/services/common.py index 91ebf5a932e8b395a24a7dbcb0f1bfe99bb99a8f..00105aedacf1dc320ac45d33020039fdd7b828a6 100644 --- a/atdb/taskdatabase/services/common.py +++ b/atdb/taskdatabase/services/common.py @@ -8,10 +8,13 @@ from enum import Enum logger = logging.getLogger(__name__) class State(Enum): + UNKNOWN = "unknown" DEFINED = "defined" STAGED = "staged" FETCHED = "fetched" PROCESSED = "processed" + HANDLE_ANCILLARY = "handle_ancillary" + ANCILLARY_HANDLED = "ancillary_handled" STORED = 'stored' VALIDATED = "validated" SCRUBBED = "scrubbed" diff --git a/atdb/taskdatabase/templates/taskdatabase/index.html b/atdb/taskdatabase/templates/taskdatabase/index.html index 334825690f190ee172d6900dae9f936be67c277d..05c6414f8a45eba9c646d2d09cff77a2488c5a1a 100644 --- a/atdb/taskdatabase/templates/taskdatabase/index.html +++ b/atdb/taskdatabase/templates/taskdatabase/index.html @@ -31,7 +31,7 @@ {% include 'taskdatabase/pagination.html' %} </div> </div> - <p class="footer"> Version 29 Mar 2024 + <p class="footer"> Version 1 Apr 2024 </div> {% include 'taskdatabase/refresh.html' %} diff --git a/atdb/taskdatabase/tests/test_models_processed_summary.py b/atdb/taskdatabase/tests/test_models_processed_summary.py new file mode 100644 index 0000000000000000000000000000000000000000..4030f586fb1b375a494454ff75cb076574852df6 --- /dev/null +++ b/atdb/taskdatabase/tests/test_models_processed_summary.py @@ -0,0 +1,49 @@ +from django.test import TestCase +import json +from taskdatabase.models import Task, Workflow, Activity +from taskdatabase.services.common import State + +class TestProcessedSummary(TestCase): + + def setUp(self): + """ + initialize test data + """ + self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") + self.workflow_requantisation.save() + + self.task1 = Task.objects.create(sas_id=222, + filter="test_blabla", + new_status=State.PROCESSED.value, + workflow=self.workflow_requantisation, + is_summary=False) + self.task1.save() + self.task2 = Task.objects.create(sas_id=222, + new_status=State.PROCESSED.value, + workflow=self.workflow_requantisation, + outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) + self.task2.save() + + def test_processed_not_on_hold(self): + """ + task 1 is processed, but not a summary dataproduct. Should NOT go on hold + """ + + actual = self.task1.resume + self.assertEqual(actual, True) + + def test_processed_on_hold(self): + """ + task 2 is processed, and a summary dataproduct. Should go on hold + """ + + actual = self.task2.resume + self.assertEqual(actual, False) + + def test_activity_is_processed(self): + """ + both tasks are processed, the activity should have the is_processed flag now + """ + + actual = self.task1.activity.is_processed + self.assertEqual(actual, True) \ No newline at end of file diff --git a/atdb/taskdatabase/tests/test_summary_tasks.py b/atdb/taskdatabase/tests/test_summary_tasks.py index bf33c9f75d68202b5ecfec0e0e801e81c50a1f40..32028129a12cc70d94031ea16462baa6b2427024 100644 --- a/atdb/taskdatabase/tests/test_summary_tasks.py +++ b/atdb/taskdatabase/tests/test_summary_tasks.py @@ -14,8 +14,8 @@ class TestSummaryTasks(TestCase): outputs={"tar_archive": [{"size": 4885985280, "basename": "L621240_SAP002_B073_P000_bf.tar", "nameroot": "L621240_SAP002_B073_P000_bf"}]}) self.summary_task_defined = Task.objects.create(sas_id=77777, new_status=State.DEFINED.value, workflow=self.workflow_requantisation, outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) - self.summary_task_stored = Task.objects.create(sas_id=77777, new_status=State.STORED.value, workflow=self.workflow_requantisation, - outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) + self.summary_task_validated = Task.objects.create(sas_id=77777, new_status=State.VALIDATED.value, workflow=self.workflow_requantisation, + outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) def test_no_summary_task(self): """ @@ -37,6 +37,6 @@ class TestSummaryTasks(TestCase): """ test summary task, at 'stored' it should know that it is a summary task and return True) """ - self.summary_task_stored.save() - actual = self.summary_task_stored.is_summary + self.summary_task_validated.save() + actual = self.summary_task_validated.is_summary self.assertEqual(actual, True) diff --git a/atdb/taskdatabase/tests/test_update_activity.py b/atdb/taskdatabase/tests/test_update_activity.py index cd015954369450045a47354dda962c44d2ecf10a..68b4f5ac23d14d81d920b18ea177370eccb8aeef 100644 --- a/atdb/taskdatabase/tests/test_update_activity.py +++ b/atdb/taskdatabase/tests/test_update_activity.py @@ -1,6 +1,7 @@ from django.test import TestCase import json from taskdatabase.models import Task, Workflow, Activity +from taskdatabase.services.common import State class TestUpdateActivity(TestCase): @@ -46,6 +47,42 @@ class TestUpdateActivity(TestCase): size_to_process=1000, size_processed=500) + self.task6 = Task.objects.create(sas_id=111, + new_status='stored', + workflow=self.workflow_requantisation, + size_to_process=1000, + size_processed=500) + self.task6.save() + self.task7 = Task.objects.create(sas_id=111, + new_status='processed', + workflow=self.workflow_requantisation, + size_to_process=1000, + size_processed=500) + self.task7.save() + self.task8 = Task.objects.create(sas_id=111, + new_status='processed', + workflow=self.workflow_requantisation, + size_to_process=1000, + size_processed=500, + outputs={"tar_archive": [ + {"size": 4885985280, "basename": "L185619_summaryCS.tar", + "nameroot": "L185619_summaryCS"}]} + ) + self.task8.save() + self.task9 = Task.objects.create(sas_id=112, + new_status='processing', + workflow=self.workflow_requantisation, + size_to_process=1000, + size_processed=500) + self.task9.save() + self.task10 = Task.objects.create(sas_id=112, + new_status='processed', + workflow=self.workflow_requantisation, + size_to_process=1000, + size_processed=500, + outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) + self.task10.save() + def test_created_activity(self): """ test if activity is created @@ -137,3 +174,27 @@ class TestUpdateActivity(TestCase): actual = activity.workflow_id self.assertEqual(actual, 22) + def test_is_not_processed(self): + """ + task 9 is not processed, task 10 is processed. + The activity.is_processed should be false + """ + + activity = self.task9.activity + + actual = activity.is_processed + self.assertEqual(actual, False) + + def test_is_processed(self): + """ + task 6, 7 and 8 are processed, + activity.is_processed should be true and activity status should go to 'handle_ancillary' + """ + + activity = self.task6.activity + + actual = activity.is_processed + self.assertEqual(actual, True) + + actual = activity.status + self.assertEqual(actual, State.HANDLE_ANCILLARY.value) \ No newline at end of file