diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index 6af6728dea29566905c7c5553d930907154615af..bc3c82bae1405f86267b821da45be8e959689120 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -157,7 +157,7 @@ class Activity(models.Model): def create_storage_location(self): workdir = Configuration.objects.get(key='executor:workdir').value - self.storage_location = workdir.replace('run', 'aggregate') + self.sas_id + self.storage_location = workdir.replace('run', 'aggregate') + str(self.sas_id) def check_if_summary(task): diff --git a/atdb/taskdatabase/services/activities_handler.py b/atdb/taskdatabase/services/activities_handler.py index 71eb44f5f57c1a6b6cf19180edca61e5cce44c9e..17834aefe3102cbf6c88e6fbd4418af557e16c8d 100644 --- a/atdb/taskdatabase/services/activities_handler.py +++ b/atdb/taskdatabase/services/activities_handler.py @@ -189,7 +189,7 @@ def update_processed_and_aggregate(task): if (not activity.is_aggregated and activity.status != State.COLLECTING_DATA.value): # check if there is already a storage_location, if not, add it. - if activity.storage_location == "unknown": + if not activity.storage_location: activity.create_storage_location() # this means that its tasks know that they should copy their H5 files to the storage_location diff --git a/atdb/taskdatabase/tests/test_models_processed_summary.py b/atdb/taskdatabase/tests/test_models_processed_summary.py index 19c7560930e664cac47fe4ac984051b2e4c2477a..2c317872ab717e06b1d6cfa120103d074603d2d2 100644 --- a/atdb/taskdatabase/tests/test_models_processed_summary.py +++ b/atdb/taskdatabase/tests/test_models_processed_summary.py @@ -1,6 +1,6 @@ from django.test import TestCase import json -from taskdatabase.models import Task, Workflow, Activity +from taskdatabase.models import Configuration, Task, Workflow, Activity from taskdatabase.services.common import State class TestProcessedSummary(TestCase): @@ -9,6 +9,8 @@ class TestProcessedSummary(TestCase): """ initialize test data """ + Configuration.objects.create(key="executor:workdir", value="/project/ldv/Share/run/") + self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation", aggregation_strategy="wait_for_summary_task") self.workflow_requantisation.save() diff --git a/atdb/taskdatabase/tests/test_update_activity.py b/atdb/taskdatabase/tests/test_update_activity.py index 18d6af3fad9ee29096e70c209c2bb02f100e11c0..d29c74ba5f31b30dd7e87580a6062a80cbd0d309 100644 --- a/atdb/taskdatabase/tests/test_update_activity.py +++ b/atdb/taskdatabase/tests/test_update_activity.py @@ -1,7 +1,7 @@ from django.test import TestCase import json -from taskdatabase.models import Task, Workflow, Activity -from taskdatabase.services.common import State +from taskdatabase.models import Configuration, Task, Workflow, Activity +from taskdatabase.services.common import State, AggregationStrategy class TestUpdateActivity(TestCase): @@ -9,9 +9,16 @@ class TestUpdateActivity(TestCase): """ initialize test data """ + # used to create the activity.storage_location + Configuration.objects.create(key="executor:workdir", value="/project/ldv/Share/run/") + self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") self.workflow_requantisation.save() + self.workflow_imaging_compression = Workflow(id=28, workflow_uri="imaging_compression", + aggregation_strategy = AggregationStrategy.COLLECT_H5.value) + self.workflow_imaging_compression.save() + self.task1 = Task.objects.create(sas_id=12345, filter="test_blabla", status='stored', @@ -83,6 +90,13 @@ class TestUpdateActivity(TestCase): outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) self.task10.save() + # to test imaging + self.task11 = Task.objects.create(sas_id=113, + new_status='fetched', + workflow=self.workflow_imaging_compression, + outputs={"inspect": {"location": "file:///project/ldv/Share/run/2023/3/26/331_30608/inspect.h5", "basename": "inspect.h5", "nameroot": "inspect"}}) + self.task10.save() + def test_created_activity(self): """ test if activity is created @@ -180,4 +194,26 @@ class TestUpdateActivity(TestCase): # check if the activity is reset self.assertFalse(activity.is_aggregated) - self.assertFalse(activity.is_processed) \ No newline at end of file + self.assertFalse(activity.is_processed) + + def test_create_storage_location(self): + """ + create activity.storage_location + + WHEN a task goes to processed, + and its workflow has the COLLECT_H5 aggregation strategy + and its activity does not have a 'storage_location' yet + """ + + activity = self.task11.activity + + # check initial state + self.assertEqual(activity.storage_location, None) + + # simulate task to PROCESSED + self.task11.new_status = State.PROCESSED.value + self.task11.save() + + expected = "/project/ldv/Share/aggregate/113" + actual = self.task11.activity.storage_location + self.assertEqual(actual, expected)