From 198dc76f0755de1989e44b09351d5c46d6471111 Mon Sep 17 00:00:00 2001
From: Vermaas <vermaas@astron.nl>
Date: Tue, 30 Jul 2024 17:31:15 +0200
Subject: [PATCH] create aggregation task

---
 atdb/taskdatabase/models.py                         | 9 ++++++---
 atdb/taskdatabase/serializers.py                    | 2 +-
 atdb/taskdatabase/services/activities_handler.py    | 6 ++++--
 atdb/taskdatabase/templates/taskdatabase/index.html | 2 +-
 4 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py
index adb0cc5d..071fca44 100644
--- a/atdb/taskdatabase/models.py
+++ b/atdb/taskdatabase/models.py
@@ -8,7 +8,7 @@ import json
 import logging
 
 from .services import calculated_qualities as qualities
-from .services.common import State, AggregationStrategy
+from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN
 
 logger = logging.getLogger(__name__)
 
@@ -224,8 +224,8 @@ class Task(models.Model):
                     # so that it won't be picked up again.
 
                     # TODO: only activate when the aggregator service actually picks this up
-                    #self.new_status = State.AGGREGATE.value
-                    pass
+                    self.new_status = State.AGGREGATE.value
+                    #pass
 
 
         except Exception as error:
@@ -260,6 +260,9 @@ class Task(models.Model):
         if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value):
             self.handle_aggregation()
 
+        if self.status in ACTIVITY_RESET_STATUSSEN:
+            self.is_aggregated = False
+
         # check in the outputs if this task should be considered to be summary task
         self.is_summary = check_if_summary(self)
 
diff --git a/atdb/taskdatabase/serializers.py b/atdb/taskdatabase/serializers.py
index d1a28668..75fb231a 100644
--- a/atdb/taskdatabase/serializers.py
+++ b/atdb/taskdatabase/serializers.py
@@ -45,7 +45,7 @@ class TaskWriteSerializer(serializers.ModelSerializer):
         model = Task
         fields = ('id','task_type','filter','predecessor','successors',
                   'joined_output_task',
-                  'project','sas_id','priority','purge_policy','cleanup_policy','resume',
+                  'project','sas_id','priority','purge_policy','cleanup_policy','resume','is_aggregated',
                   'new_workflow_id','new_workflow_uri','workflow',
                   'stage_request_id',
                   'status','new_status','quality','calculated_qualities',
diff --git a/atdb/taskdatabase/services/activities_handler.py b/atdb/taskdatabase/services/activities_handler.py
index 14a69f6d..4f702918 100644
--- a/atdb/taskdatabase/services/activities_handler.py
+++ b/atdb/taskdatabase/services/activities_handler.py
@@ -178,7 +178,7 @@ def update_processed_and_aggregate(task):
     current_is_processed = activity.is_processed
     activity.is_processed = True
     non_discarded_found = False
-    for t in Task.objects.filter(sas_id=task.sas_id):
+    for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'):
         if t.status not in PROCESSED_STATUSSES:
             activity.is_processed = False
             break
@@ -202,7 +202,9 @@ def update_processed_and_aggregate(task):
     if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value):
 
         # check if the activity is ready to collect H5 data
-        if (not activity.is_aggregated and activity.status != State.COLLECTING_DATA.value):
+        if (not activity.is_aggregated and
+                activity.status != State.COLLECTING_DATA.value and
+                activity.status != State.AGGREGATE.value):
 
             # check if there is already a storage_location, if not, add it.
             if not activity.storage_location:
diff --git a/atdb/taskdatabase/templates/taskdatabase/index.html b/atdb/taskdatabase/templates/taskdatabase/index.html
index 20043ea4..f50f7b8f 100644
--- a/atdb/taskdatabase/templates/taskdatabase/index.html
+++ b/atdb/taskdatabase/templates/taskdatabase/index.html
@@ -31,7 +31,7 @@
             {% include 'taskdatabase/pagination.html' %}
         </div>
     </div>
-    <p class="footer"> Version 29 Jul 2024
+    <p class="footer"> Version 30 Jul 2024
 </div>
 
 {% include 'taskdatabase/refresh.html' %}
-- 
GitLab