Skip to content
Snippets Groups Projects
Commit 84f6bae0 authored by Nico Vermaas's avatar Nico Vermaas
Browse files

fix storage_location creation

parent a40a7672
Branches
No related tags found
1 merge request!361SDC-1423 use aggregation strategy
Pipeline #89824 passed
...@@ -110,7 +110,7 @@ class Activity(models.Model): ...@@ -110,7 +110,7 @@ class Activity(models.Model):
def __str__(self): def __str__(self):
return str(self.sas_id) return str(self.sas_id)
def create_storage_location(self, task): def create_storage_location(self):
workdir = Configuration.objects.get(key='executor:workdir').value workdir = Configuration.objects.get(key='executor:workdir').value
self.storage_location = workdir.replace('run', 'aggregate') + str(self.sas_id) self.storage_location = workdir.replace('run', 'aggregate') + str(self.sas_id)
......
...@@ -202,6 +202,11 @@ def update_processed_and_aggregate(task): ...@@ -202,6 +202,11 @@ def update_processed_and_aggregate(task):
if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value): if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value):
# check if there is already a storage_location, if not, add it.
if not activity.storage_location:
# for this aggregation_strategy, the activity storage_location is the workdir of the aggregation task
activity.create_storage_location()
# check if the activity is ready to collect H5 data # check if the activity is ready to collect H5 data
if (not activity.is_aggregated and if (not activity.is_aggregated and
activity.status != State.COLLECTING_DATA.value and activity.status != State.COLLECTING_DATA.value and
...@@ -215,11 +220,6 @@ def update_processed_and_aggregate(task): ...@@ -215,11 +220,6 @@ def update_processed_and_aggregate(task):
else: else:
aggregation_task = aggregation_tasks[0] aggregation_task = aggregation_tasks[0]
# check if there is already a storage_location, if not, add it.
if not activity.storage_location:
# for this aggregation_strategy, the activity storage_location is the workdir of the aggregation task
activity.create_storage_location()
# this means that its tasks know that they should copy their H5 files to the storage_location # this means that its tasks know that they should copy their H5 files to the storage_location
# (but the tasks cannot do that, the aggregator service does) # (but the tasks cannot do that, the aggregator service does)
activity.status = State.COLLECTING_DATA.value activity.status = State.COLLECTING_DATA.value
......
...@@ -216,4 +216,4 @@ class TestUpdateActivity(TestCase): ...@@ -216,4 +216,4 @@ class TestUpdateActivity(TestCase):
expected = "/project/ldv/Share/aggregate/113" expected = "/project/ldv/Share/aggregate/113"
actual = self.task11.activity.storage_location actual = self.task11.activity.storage_location
#self.assertEqual(actual, expected) self.assertEqual(actual, expected)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment